001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
023
024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
025import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
026import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
027import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
028import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
029import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
030import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
031import org.apache.hbase.thirdparty.com.google.protobuf.Message;
032import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
033import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
034import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
035import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
036
037import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
038
039import java.io.IOException;
040import java.net.InetSocketAddress;
041import java.net.SocketAddress;
042import java.net.UnknownHostException;
043import java.util.Collection;
044import java.util.HashMap;
045import java.util.Map;
046import java.util.concurrent.Executors;
047import java.util.concurrent.ScheduledExecutorService;
048import java.util.concurrent.ScheduledFuture;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicInteger;
051
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058import org.apache.hadoop.hbase.client.MetricsConnection;
059import org.apache.hadoop.hbase.codec.Codec;
060import org.apache.hadoop.hbase.codec.KeyValueCodec;
061import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
062import org.apache.hadoop.hbase.security.User;
063import org.apache.hadoop.hbase.security.UserProvider;
064import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.PoolMap;
067import org.apache.hadoop.hbase.util.Threads;
068import org.apache.hadoop.io.compress.CompressionCodec;
069import org.apache.hadoop.ipc.RemoteException;
070import org.apache.hadoop.security.token.TokenIdentifier;
071import org.apache.hadoop.security.token.TokenSelector;
072
073import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
074
075/**
076 * Provides the basics for a RpcClient implementation like configuration and Logging.
077 * <p>
078 * Locking schema of the current IPC implementation
079 * <ul>
080 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
081 * connection.</li>
082 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
083 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
084 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
085 * of how to deal with cancel.</li>
086 * <li>For connection implementation, the construction of a connection should be as fast as possible
087 * because the creation is protected under a lock. Connect to remote side when needed. There is no
088 * forced locking schema for a connection implementation.</li>
089 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
090 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
091 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
092 * of the callbacks are free to hold any lock.</li>
093 * </ul>
094 * @since 2.0.0
095 */
096@InterfaceAudience.Private
097public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
098  // Log level is being changed in tests
099  public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
100
101  protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
102      Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
103
104  private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
105      .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
106
107  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
108      justification="the rest of the system which live in the different package can use")
109  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
110
111  static {
112    TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
113  }
114
115  protected boolean running = true; // if client runs
116
117  protected final Configuration conf;
118  protected final String clusterId;
119  protected final SocketAddress localAddr;
120  protected final MetricsConnection metrics;
121
122  protected final UserProvider userProvider;
123  protected final CellBlockBuilder cellBlockBuilder;
124
125  protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
126  // time (in ms), it will be closed at any moment.
127  protected final int maxRetries; // the max. no. of retries for socket connections
128  protected final long failureSleep; // Time to sleep before retry on failure.
129  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
130  protected final boolean tcpKeepAlive; // if T then use keepalives
131  protected final Codec codec;
132  protected final CompressionCodec compressor;
133  protected final boolean fallbackAllowed;
134
135  protected final FailedServers failedServers;
136
137  protected final int connectTO;
138  protected final int readTO;
139  protected final int writeTO;
140
141  protected final PoolMap<ConnectionId, T> connections;
142
143  private final AtomicInteger callIdCnt = new AtomicInteger(0);
144
145  private final ScheduledFuture<?> cleanupIdleConnectionTask;
146
147  private int maxConcurrentCallsPerServer;
148
149  private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
150      CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
151          build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
152            @Override public AtomicInteger load(InetSocketAddress key) throws Exception {
153              return new AtomicInteger(0);
154            }
155          });
156
157  /**
158   * Construct an IPC client for the cluster <code>clusterId</code>
159   * @param conf configuration
160   * @param clusterId the cluster id
161   * @param localAddr client socket bind address.
162   * @param metrics the connection metrics
163   */
164  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
165      MetricsConnection metrics) {
166    this.userProvider = UserProvider.instantiate(conf);
167    this.localAddr = localAddr;
168    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
169    this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
170    this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
171      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
172    this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
173    this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
174    this.cellBlockBuilder = new CellBlockBuilder(conf);
175
176    this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
177    this.conf = conf;
178    this.codec = getCodec();
179    this.compressor = getCompressor(conf);
180    this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
181      IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
182    this.failedServers = new FailedServers(conf);
183    this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
184    this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
185    this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
186    this.metrics = metrics;
187    this.maxConcurrentCallsPerServer = conf.getInt(
188        HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
189        HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
190
191    this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
192
193    this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
194
195      @Override
196      public void run() {
197        cleanupIdleConnections();
198      }
199    }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
200
201    if (LOG.isDebugEnabled()) {
202      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
203          + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
204          + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
205          + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
206          + this.fallbackAllowed + ", bind address="
207          + (this.localAddr != null ? this.localAddr : "null"));
208    }
209  }
210
211  private void cleanupIdleConnections() {
212    long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
213    synchronized (connections) {
214      for (T conn : connections.values()) {
215        // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
216        // connection itself has already shutdown. The latter check is because we may still
217        // have some pending calls on connection so we should not shutdown the connection outside.
218        // The connection itself will disconnect if there is no pending call for maxIdleTime.
219        if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
220          if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address);
221          connections.removeValue(conn.remoteId(), conn);
222          conn.cleanupConnection();
223        }
224      }
225    }
226  }
227
228  @VisibleForTesting
229  public static String getDefaultCodec(final Configuration c) {
230    // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
231    // Configuration will complain -- then no default codec (and we'll pb everything). Else
232    // default is KeyValueCodec
233    return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
234  }
235
236  /**
237   * Encapsulate the ugly casting and RuntimeException conversion in private method.
238   * @return Codec to use on this client.
239   */
240  Codec getCodec() {
241    // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
242    // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
243    String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
244    if (className == null || className.length() == 0) {
245      return null;
246    }
247    try {
248      return (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
249    } catch (Exception e) {
250      throw new RuntimeException("Failed getting codec " + className, e);
251    }
252  }
253
254  @Override
255  public boolean hasCellBlockSupport() {
256    return this.codec != null;
257  }
258
259  // for writing tests that want to throw exception when connecting.
260  @VisibleForTesting
261  boolean isTcpNoDelay() {
262    return tcpNoDelay;
263  }
264
265  /**
266   * Encapsulate the ugly casting and RuntimeException conversion in private method.
267   * @param conf configuration
268   * @return The compressor to use on this client.
269   */
270  private static CompressionCodec getCompressor(final Configuration conf) {
271    String className = conf.get("hbase.client.rpc.compressor", null);
272    if (className == null || className.isEmpty()) {
273      return null;
274    }
275    try {
276      return (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
277    } catch (Exception e) {
278      throw new RuntimeException("Failed getting compressor " + className, e);
279    }
280  }
281
282  /**
283   * Return the pool type specified in the configuration, which must be set to either
284   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
285   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
286   * former. For applications with many user threads, use a small round-robin pool. For applications
287   * with few user threads, you may want to try using a thread-local pool. In any case, the number
288   * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
289   * system's hard limit on the number of connections.
290   * @param config configuration
291   * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
292   *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
293   */
294  private static PoolMap.PoolType getPoolType(Configuration config) {
295    return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
296      PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal);
297  }
298
299  /**
300   * Return the pool size specified in the configuration, which is applicable only if the pool type
301   * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
302   * @param config configuration
303   * @return the maximum pool size
304   */
305  private static int getPoolSize(Configuration config) {
306    return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
307  }
308
309  private int nextCallId() {
310    int id, next;
311    do {
312      id = callIdCnt.get();
313      next = id < Integer.MAX_VALUE ? id + 1 : 0;
314    } while (!callIdCnt.compareAndSet(id, next));
315    return id;
316  }
317
318  /**
319   * Make a blocking call. Throws exceptions if there are network problems or if the remote code
320   * threw an exception.
321   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
322   *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
323   *          new Connection each time.
324   * @return A pair with the Message response and the Cell data (if any).
325   */
326  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
327      Message param, Message returnType, final User ticket, final InetSocketAddress isa)
328      throws ServiceException {
329    BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
330    callMethod(md, hrc, param, returnType, ticket, isa, done);
331    Message val;
332    try {
333      val = done.get();
334    } catch (IOException e) {
335      throw new ServiceException(e);
336    }
337    if (hrc.failed()) {
338      throw new ServiceException(hrc.getFailed());
339    } else {
340      return val;
341    }
342  }
343
344  /**
345   * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
346   * given host/port are reused.
347   */
348  private T getConnection(ConnectionId remoteId) throws IOException {
349    if (failedServers.isFailedServer(remoteId.getAddress())) {
350      if (LOG.isDebugEnabled()) {
351        LOG.debug("Not trying to connect to " + remoteId.address
352            + " this server is in the failed servers list");
353      }
354      throw new FailedServerException(
355          "This server is in the failed servers list: " + remoteId.address);
356    }
357    T conn;
358    synchronized (connections) {
359      if (!running) {
360        throw new StoppedRpcClientException();
361      }
362      conn = connections.get(remoteId);
363      if (conn == null) {
364        conn = createConnection(remoteId);
365        connections.put(remoteId, conn);
366      }
367      conn.setLastTouched(EnvironmentEdgeManager.currentTime());
368    }
369    return conn;
370  }
371
372  /**
373   * Not connected.
374   */
375  protected abstract T createConnection(ConnectionId remoteId) throws IOException;
376
377  private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
378      RpcCallback<Message> callback) {
379    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
380    if (metrics != null) {
381      metrics.updateRpc(call.md, call.param, call.callStats);
382    }
383    if (LOG.isTraceEnabled()) {
384      LOG.trace(
385        "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
386    }
387    if (call.error != null) {
388      if (call.error instanceof RemoteException) {
389        call.error.fillInStackTrace();
390        hrc.setFailed(call.error);
391      } else {
392        hrc.setFailed(wrapException(addr, call.error));
393      }
394      callback.run(null);
395    } else {
396      hrc.setDone(call.cells);
397      callback.run(call.response);
398    }
399  }
400
401  private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
402      final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
403      final RpcCallback<Message> callback) {
404    final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
405    cs.setStartTime(EnvironmentEdgeManager.currentTime());
406
407    if (param instanceof ClientProtos.MultiRequest) {
408      ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
409      int numActions = 0;
410      for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
411        numActions += regionAction.getActionCount();
412      }
413
414      cs.setNumActionsPerServer(numActions);
415    }
416
417    final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
418    Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
419        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
420          @Override
421          public void run(Call call) {
422            counter.decrementAndGet();
423            onCallFinished(call, hrc, addr, callback);
424          }
425        }, cs);
426    ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
427    int count = counter.incrementAndGet();
428    try {
429      if (count > maxConcurrentCallsPerServer) {
430        throw new ServerTooBusyException(addr, count);
431      }
432      cs.setConcurrentCallsPerServer(count);
433      T connection = getConnection(remoteId);
434      connection.sendRequest(call, hrc);
435    } catch (Exception e) {
436      call.setException(toIOE(e));
437    }
438  }
439
440  private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
441    InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
442    if (addr.isUnresolved()) {
443      throw new UnknownHostException("can not resolve " + sn.getServerName());
444    }
445    return addr;
446  }
447
448  /**
449   * Interrupt the connections to the given ip:port server. This should be called if the server is
450   * known as actually dead. This will not prevent current operation to be retried, and, depending
451   * on their own behavior, they may retry on the same server. This can be a feature, for example at
452   * startup. In any case, they're likely to get connection refused (if the process died) or no
453   * route to host: i.e. their next retries should be faster and with a safe exception.
454   */
455  @Override
456  public void cancelConnections(ServerName sn) {
457    synchronized (connections) {
458      for (T connection : connections.values()) {
459        ConnectionId remoteId = connection.remoteId();
460        if (remoteId.address.getPort() == sn.getPort()
461            && remoteId.address.getHostName().equals(sn.getHostname())) {
462          LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
463              + connection.remoteId);
464          connections.removeValue(remoteId, connection);
465          connection.shutdown();
466          connection.cleanupConnection();
467        }
468      }
469    }
470  }
471  /**
472   * Configure an hbase rpccontroller
473   * @param controller to configure
474   * @param channelOperationTimeout timeout for operation
475   * @return configured controller
476   */
477  static HBaseRpcController configureHBaseRpcController(
478      RpcController controller, int channelOperationTimeout) {
479    HBaseRpcController hrc;
480    if (controller != null && controller instanceof HBaseRpcController) {
481      hrc = (HBaseRpcController) controller;
482      if (!hrc.hasCallTimeout()) {
483        hrc.setCallTimeout(channelOperationTimeout);
484      }
485    } else {
486      hrc = new HBaseRpcControllerImpl();
487      hrc.setCallTimeout(channelOperationTimeout);
488    }
489    return hrc;
490  }
491
492  protected abstract void closeInternal();
493
494  @Override
495  public void close() {
496    if (LOG.isDebugEnabled()) {
497      LOG.debug("Stopping rpc client");
498    }
499    Collection<T> connToClose;
500    synchronized (connections) {
501      if (!running) {
502        return;
503      }
504      running = false;
505      connToClose = connections.values();
506      connections.clear();
507    }
508    cleanupIdleConnectionTask.cancel(true);
509    for (T conn : connToClose) {
510      conn.shutdown();
511    }
512    closeInternal();
513    for (T conn : connToClose) {
514      conn.cleanupConnection();
515    }
516  }
517
518  @Override
519  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
520      int rpcTimeout) throws UnknownHostException {
521    return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
522  }
523
524  @Override
525  public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
526      throws UnknownHostException {
527    return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
528  }
529
530  private static class AbstractRpcChannel {
531
532    protected final InetSocketAddress addr;
533
534    protected final AbstractRpcClient<?> rpcClient;
535
536    protected final User ticket;
537
538    protected final int rpcTimeout;
539
540    protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
541        User ticket, int rpcTimeout) {
542      this.addr = addr;
543      this.rpcClient = rpcClient;
544      this.ticket = ticket;
545      this.rpcTimeout = rpcTimeout;
546    }
547
548    /**
549     * Configure an rpc controller
550     * @param controller to configure
551     * @return configured rpc controller
552     */
553    protected HBaseRpcController configureRpcController(RpcController controller) {
554      HBaseRpcController hrc;
555      // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
556      // side. And now we may use ServerRpcController.
557      if (controller != null && controller instanceof HBaseRpcController) {
558        hrc = (HBaseRpcController) controller;
559        if (!hrc.hasCallTimeout()) {
560          hrc.setCallTimeout(rpcTimeout);
561        }
562      } else {
563        hrc = new HBaseRpcControllerImpl();
564        hrc.setCallTimeout(rpcTimeout);
565      }
566      return hrc;
567    }
568  }
569
570  /**
571   * Blocking rpc channel that goes via hbase rpc.
572   */
573  @VisibleForTesting
574  public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
575      implements BlockingRpcChannel {
576
577    protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
578        InetSocketAddress addr, User ticket, int rpcTimeout) {
579      super(rpcClient, addr, ticket, rpcTimeout);
580    }
581
582    @Override
583    public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
584        Message param, Message returnType) throws ServiceException {
585      return rpcClient.callBlockingMethod(md, configureRpcController(controller),
586        param, returnType, ticket, addr);
587    }
588  }
589
590  /**
591   * Async rpc channel that goes via hbase rpc.
592   */
593  public static class RpcChannelImplementation extends AbstractRpcChannel implements
594      RpcChannel {
595
596    protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
597        User ticket, int rpcTimeout) throws UnknownHostException {
598      super(rpcClient, addr, ticket, rpcTimeout);
599    }
600
601    @Override
602    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
603        Message param, Message returnType, RpcCallback<Message> done) {
604      // This method does not throw any exceptions, so the caller must provide a
605      // HBaseRpcController which is used to pass the exceptions.
606      this.rpcClient.callMethod(md,
607        configureRpcController(Preconditions.checkNotNull(controller,
608          "RpcController can not be null for async rpc call")),
609        param, returnType, ticket, addr, done);
610    }
611  }
612}