001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
023
024import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
025import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
026import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
027import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
028import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
029import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
030import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
031import org.apache.hbase.thirdparty.com.google.protobuf.Message;
032import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
033import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
034import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
035import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
036
037import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
038
039import java.io.IOException;
040import java.net.InetSocketAddress;
041import java.net.SocketAddress;
042import java.net.UnknownHostException;
043import java.util.Collection;
044import java.util.HashMap;
045import java.util.Map;
046import java.util.concurrent.Executors;
047import java.util.concurrent.ScheduledExecutorService;
048import java.util.concurrent.ScheduledFuture;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicInteger;
051
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.ServerName;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058import org.apache.hadoop.hbase.client.MetricsConnection;
059import org.apache.hadoop.hbase.codec.Codec;
060import org.apache.hadoop.hbase.codec.KeyValueCodec;
061import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
062import org.apache.hadoop.hbase.security.User;
063import org.apache.hadoop.hbase.security.UserProvider;
064import org.apache.hadoop.hbase.security.token.AuthenticationTokenSelector;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.hbase.util.PoolMap;
067import org.apache.hadoop.hbase.util.Threads;
068import org.apache.hadoop.io.compress.CompressionCodec;
069import org.apache.hadoop.ipc.RemoteException;
070import org.apache.hadoop.security.token.TokenIdentifier;
071import org.apache.hadoop.security.token.TokenSelector;
072
073/**
074 * Provides the basics for a RpcClient implementation like configuration and Logging.
075 * <p>
076 * Locking schema of the current IPC implementation
077 * <ul>
078 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
079 * connection.</li>
080 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
081 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
082 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
083 * of how to deal with cancel.</li>
084 * <li>For connection implementation, the construction of a connection should be as fast as possible
085 * because the creation is protected under a lock. Connect to remote side when needed. There is no
086 * forced locking schema for a connection implementation.</li>
087 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
088 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
089 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
090 * of the callbacks are free to hold any lock.</li>
091 * </ul>
092 * @since 2.0.0
093 */
094@InterfaceAudience.Private
095public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
096  // Log level is being changed in tests
097  public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
098
099  protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
100      Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS);
101
102  private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
103      .newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper"));
104
105  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT",
106      justification="the rest of the system which live in the different package can use")
107  protected final static Map<Kind, TokenSelector<? extends TokenIdentifier>> TOKEN_HANDLERS = new HashMap<>();
108
109  static {
110    TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector());
111  }
112
113  protected boolean running = true; // if client runs
114
115  protected final Configuration conf;
116  protected final String clusterId;
117  protected final SocketAddress localAddr;
118  protected final MetricsConnection metrics;
119
120  protected final UserProvider userProvider;
121  protected final CellBlockBuilder cellBlockBuilder;
122
123  protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
124  // time (in ms), it will be closed at any moment.
125  protected final int maxRetries; // the max. no. of retries for socket connections
126  protected final long failureSleep; // Time to sleep before retry on failure.
127  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
128  protected final boolean tcpKeepAlive; // if T then use keepalives
129  protected final Codec codec;
130  protected final CompressionCodec compressor;
131  protected final boolean fallbackAllowed;
132
133  protected final FailedServers failedServers;
134
135  protected final int connectTO;
136  protected final int readTO;
137  protected final int writeTO;
138
139  protected final PoolMap<ConnectionId, T> connections;
140
141  private final AtomicInteger callIdCnt = new AtomicInteger(0);
142
143  private final ScheduledFuture<?> cleanupIdleConnectionTask;
144
145  private int maxConcurrentCallsPerServer;
146
147  private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
148      CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
149          build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
150            @Override public AtomicInteger load(InetSocketAddress key) throws Exception {
151              return new AtomicInteger(0);
152            }
153          });
154
155  /**
156   * Construct an IPC client for the cluster <code>clusterId</code>
157   * @param conf configuration
158   * @param clusterId the cluster id
159   * @param localAddr client socket bind address.
160   * @param metrics the connection metrics
161   */
162  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
163      MetricsConnection metrics) {
164    this.userProvider = UserProvider.instantiate(conf);
165    this.localAddr = localAddr;
166    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
167    this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
168    this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
169      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
170    this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
171    this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
172    this.cellBlockBuilder = new CellBlockBuilder(conf);
173
174    this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
175    this.conf = conf;
176    this.codec = getCodec();
177    this.compressor = getCompressor(conf);
178    this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
179      IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
180    this.failedServers = new FailedServers(conf);
181    this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
182    this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
183    this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
184    this.metrics = metrics;
185    this.maxConcurrentCallsPerServer = conf.getInt(
186        HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
187        HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
188
189    this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
190
191    this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
192
193      @Override
194      public void run() {
195        cleanupIdleConnections();
196      }
197    }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
198
199    if (LOG.isDebugEnabled()) {
200      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
201          + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
202          + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
203          + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
204          + this.fallbackAllowed + ", bind address="
205          + (this.localAddr != null ? this.localAddr : "null"));
206    }
207  }
208
209  private void cleanupIdleConnections() {
210    long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
211    synchronized (connections) {
212      for (T conn : connections.values()) {
213        // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
214        // connection itself has already shutdown. The latter check is because we may still
215        // have some pending calls on connection so we should not shutdown the connection outside.
216        // The connection itself will disconnect if there is no pending call for maxIdleTime.
217        if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
218          if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address);
219          connections.removeValue(conn.remoteId(), conn);
220          conn.cleanupConnection();
221        }
222      }
223    }
224  }
225
226  @VisibleForTesting
227  public static String getDefaultCodec(final Configuration c) {
228    // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
229    // Configuration will complain -- then no default codec (and we'll pb everything). Else
230    // default is KeyValueCodec
231    return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
232  }
233
234  /**
235   * Encapsulate the ugly casting and RuntimeException conversion in private method.
236   * @return Codec to use on this client.
237   */
238  Codec getCodec() {
239    // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
240    // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
241    String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
242    if (className == null || className.length() == 0) {
243      return null;
244    }
245    try {
246      return (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
247    } catch (Exception e) {
248      throw new RuntimeException("Failed getting codec " + className, e);
249    }
250  }
251
252  @Override
253  public boolean hasCellBlockSupport() {
254    return this.codec != null;
255  }
256
257  // for writing tests that want to throw exception when connecting.
258  @VisibleForTesting
259  boolean isTcpNoDelay() {
260    return tcpNoDelay;
261  }
262
263  /**
264   * Encapsulate the ugly casting and RuntimeException conversion in private method.
265   * @param conf configuration
266   * @return The compressor to use on this client.
267   */
268  private static CompressionCodec getCompressor(final Configuration conf) {
269    String className = conf.get("hbase.client.rpc.compressor", null);
270    if (className == null || className.isEmpty()) {
271      return null;
272    }
273    try {
274      return (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
275    } catch (Exception e) {
276      throw new RuntimeException("Failed getting compressor " + className, e);
277    }
278  }
279
280  /**
281   * Return the pool type specified in the configuration, which must be set to either
282   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
283   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
284   * former. For applications with many user threads, use a small round-robin pool. For applications
285   * with few user threads, you may want to try using a thread-local pool. In any case, the number
286   * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
287   * system's hard limit on the number of connections.
288   * @param config configuration
289   * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
290   *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
291   */
292  private static PoolMap.PoolType getPoolType(Configuration config) {
293    return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
294      PoolMap.PoolType.RoundRobin, PoolMap.PoolType.ThreadLocal);
295  }
296
297  /**
298   * Return the pool size specified in the configuration, which is applicable only if the pool type
299   * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
300   * @param config configuration
301   * @return the maximum pool size
302   */
303  private static int getPoolSize(Configuration config) {
304    return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
305  }
306
307  private int nextCallId() {
308    int id, next;
309    do {
310      id = callIdCnt.get();
311      next = id < Integer.MAX_VALUE ? id + 1 : 0;
312    } while (!callIdCnt.compareAndSet(id, next));
313    return id;
314  }
315
316  /**
317   * Make a blocking call. Throws exceptions if there are network problems or if the remote code
318   * threw an exception.
319   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
320   *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
321   *          new Connection each time.
322   * @return A pair with the Message response and the Cell data (if any).
323   */
324  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
325      Message param, Message returnType, final User ticket, final InetSocketAddress isa)
326      throws ServiceException {
327    BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
328    callMethod(md, hrc, param, returnType, ticket, isa, done);
329    Message val;
330    try {
331      val = done.get();
332    } catch (IOException e) {
333      throw new ServiceException(e);
334    }
335    if (hrc.failed()) {
336      throw new ServiceException(hrc.getFailed());
337    } else {
338      return val;
339    }
340  }
341
342  /**
343   * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
344   * given host/port are reused.
345   */
346  private T getConnection(ConnectionId remoteId) throws IOException {
347    if (failedServers.isFailedServer(remoteId.getAddress())) {
348      if (LOG.isDebugEnabled()) {
349        LOG.debug("Not trying to connect to " + remoteId.address
350            + " this server is in the failed servers list");
351      }
352      throw new FailedServerException(
353          "This server is in the failed servers list: " + remoteId.address);
354    }
355    T conn;
356    synchronized (connections) {
357      if (!running) {
358        throw new StoppedRpcClientException();
359      }
360      conn = connections.get(remoteId);
361      if (conn == null) {
362        conn = createConnection(remoteId);
363        connections.put(remoteId, conn);
364      }
365      conn.setLastTouched(EnvironmentEdgeManager.currentTime());
366    }
367    return conn;
368  }
369
370  /**
371   * Not connected.
372   */
373  protected abstract T createConnection(ConnectionId remoteId) throws IOException;
374
375  private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
376      RpcCallback<Message> callback) {
377    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
378    if (metrics != null) {
379      metrics.updateRpc(call.md, call.param, call.callStats);
380    }
381    if (LOG.isTraceEnabled()) {
382      LOG.trace(
383        "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
384    }
385    if (call.error != null) {
386      if (call.error instanceof RemoteException) {
387        call.error.fillInStackTrace();
388        hrc.setFailed(call.error);
389      } else {
390        hrc.setFailed(wrapException(addr, call.error));
391      }
392      callback.run(null);
393    } else {
394      hrc.setDone(call.cells);
395      callback.run(call.response);
396    }
397  }
398
399  private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
400      final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
401      final RpcCallback<Message> callback) {
402    final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
403    cs.setStartTime(EnvironmentEdgeManager.currentTime());
404    final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
405    Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
406        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
407          @Override
408          public void run(Call call) {
409            counter.decrementAndGet();
410            onCallFinished(call, hrc, addr, callback);
411          }
412        }, cs);
413    ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
414    int count = counter.incrementAndGet();
415    try {
416      if (count > maxConcurrentCallsPerServer) {
417        throw new ServerTooBusyException(addr, count);
418      }
419      cs.setConcurrentCallsPerServer(count);
420      T connection = getConnection(remoteId);
421      connection.sendRequest(call, hrc);
422    } catch (Exception e) {
423      call.setException(toIOE(e));
424    }
425  }
426
427  private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
428    InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
429    if (addr.isUnresolved()) {
430      throw new UnknownHostException("can not resolve " + sn.getServerName());
431    }
432    return addr;
433  }
434
435  /**
436   * Interrupt the connections to the given ip:port server. This should be called if the server is
437   * known as actually dead. This will not prevent current operation to be retried, and, depending
438   * on their own behavior, they may retry on the same server. This can be a feature, for example at
439   * startup. In any case, they're likely to get connection refused (if the process died) or no
440   * route to host: i.e. their next retries should be faster and with a safe exception.
441   */
442  @Override
443  public void cancelConnections(ServerName sn) {
444    synchronized (connections) {
445      for (T connection : connections.values()) {
446        ConnectionId remoteId = connection.remoteId();
447        if (remoteId.address.getPort() == sn.getPort()
448            && remoteId.address.getHostName().equals(sn.getHostname())) {
449          LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
450              + connection.remoteId);
451          connections.removeValue(remoteId, connection);
452          connection.shutdown();
453        }
454      }
455    }
456  }
457  /**
458   * Configure an hbase rpccontroller
459   * @param controller to configure
460   * @param channelOperationTimeout timeout for operation
461   * @return configured controller
462   */
463  static HBaseRpcController configureHBaseRpcController(
464      RpcController controller, int channelOperationTimeout) {
465    HBaseRpcController hrc;
466    if (controller != null && controller instanceof HBaseRpcController) {
467      hrc = (HBaseRpcController) controller;
468      if (!hrc.hasCallTimeout()) {
469        hrc.setCallTimeout(channelOperationTimeout);
470      }
471    } else {
472      hrc = new HBaseRpcControllerImpl();
473      hrc.setCallTimeout(channelOperationTimeout);
474    }
475    return hrc;
476  }
477
478  protected abstract void closeInternal();
479
480  @Override
481  public void close() {
482    if (LOG.isDebugEnabled()) {
483      LOG.debug("Stopping rpc client");
484    }
485    Collection<T> connToClose;
486    synchronized (connections) {
487      if (!running) {
488        return;
489      }
490      running = false;
491      connToClose = connections.values();
492      connections.clear();
493    }
494    cleanupIdleConnectionTask.cancel(true);
495    for (T conn : connToClose) {
496      conn.shutdown();
497    }
498    closeInternal();
499    for (T conn : connToClose) {
500      conn.cleanupConnection();
501    }
502  }
503
504  @Override
505  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
506      int rpcTimeout) throws UnknownHostException {
507    return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
508  }
509
510  @Override
511  public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
512      throws UnknownHostException {
513    return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
514  }
515
516  private static class AbstractRpcChannel {
517
518    protected final InetSocketAddress addr;
519
520    protected final AbstractRpcClient<?> rpcClient;
521
522    protected final User ticket;
523
524    protected final int rpcTimeout;
525
526    protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
527        User ticket, int rpcTimeout) {
528      this.addr = addr;
529      this.rpcClient = rpcClient;
530      this.ticket = ticket;
531      this.rpcTimeout = rpcTimeout;
532    }
533
534    /**
535     * Configure an rpc controller
536     * @param controller to configure
537     * @return configured rpc controller
538     */
539    protected HBaseRpcController configureRpcController(RpcController controller) {
540      HBaseRpcController hrc;
541      // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
542      // side. And now we may use ServerRpcController.
543      if (controller != null && controller instanceof HBaseRpcController) {
544        hrc = (HBaseRpcController) controller;
545        if (!hrc.hasCallTimeout()) {
546          hrc.setCallTimeout(rpcTimeout);
547        }
548      } else {
549        hrc = new HBaseRpcControllerImpl();
550        hrc.setCallTimeout(rpcTimeout);
551      }
552      return hrc;
553    }
554  }
555
556  /**
557   * Blocking rpc channel that goes via hbase rpc.
558   */
559  @VisibleForTesting
560  public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
561      implements BlockingRpcChannel {
562
563    protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
564        InetSocketAddress addr, User ticket, int rpcTimeout) {
565      super(rpcClient, addr, ticket, rpcTimeout);
566    }
567
568    @Override
569    public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
570        Message param, Message returnType) throws ServiceException {
571      return rpcClient.callBlockingMethod(md, configureRpcController(controller),
572        param, returnType, ticket, addr);
573    }
574  }
575
576  /**
577   * Async rpc channel that goes via hbase rpc.
578   */
579  public static class RpcChannelImplementation extends AbstractRpcChannel implements
580      RpcChannel {
581
582    protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
583        User ticket, int rpcTimeout) throws UnknownHostException {
584      super(rpcClient, addr, ticket, rpcTimeout);
585    }
586
587    @Override
588    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
589        Message param, Message returnType, RpcCallback<Message> done) {
590      // This method does not throw any exceptions, so the caller must provide a
591      // HBaseRpcController which is used to pass the exceptions.
592      this.rpcClient.callMethod(md,
593        configureRpcController(Preconditions.checkNotNull(controller,
594          "RpcController can not be null for async rpc call")),
595        param, returnType, ticket, addr, done);
596    }
597  }
598}