001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
023
024import java.io.IOException;
025import java.net.InetSocketAddress;
026import java.net.SocketAddress;
027import java.net.UnknownHostException;
028import java.util.Collection;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ScheduledExecutorService;
031import java.util.concurrent.ScheduledFuture;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.client.MetricsConnection;
038import org.apache.hadoop.hbase.codec.Codec;
039import org.apache.hadoop.hbase.codec.KeyValueCodec;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.security.UserProvider;
042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
043import org.apache.hadoop.hbase.util.PoolMap;
044import org.apache.hadoop.hbase.util.Threads;
045import org.apache.hadoop.io.compress.CompressionCodec;
046import org.apache.hadoop.ipc.RemoteException;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
056import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
057import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
058import org.apache.hbase.thirdparty.com.google.protobuf.Message;
059import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
060import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
061import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
062import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
063import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
066
067/**
068 * Provides the basics for a RpcClient implementation like configuration and Logging.
069 * <p>
070 * Locking schema of the current IPC implementation
071 * <ul>
072 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
073 * connection.</li>
074 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
075 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
076 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
077 * of how to deal with cancel.</li>
078 * <li>For connection implementation, the construction of a connection should be as fast as possible
079 * because the creation is protected under a lock. Connect to remote side when needed. There is no
080 * forced locking schema for a connection implementation.</li>
081 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
082 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
083 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
084 * of the callbacks are free to hold any lock.</li>
085 * </ul>
086 * @since 2.0.0
087 */
088@InterfaceAudience.Private
089public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
090  // Log level is being changed in tests
091  public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
092
093  protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
094    new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true)
095      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
096    10, TimeUnit.MILLISECONDS);
097
098  private static final ScheduledExecutorService IDLE_CONN_SWEEPER =
099    Executors.newScheduledThreadPool(1,
100      new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
101        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
102
103  private boolean running = true; // if client runs
104
105  protected final Configuration conf;
106  protected final String clusterId;
107  protected final SocketAddress localAddr;
108  protected final MetricsConnection metrics;
109
110  protected final UserProvider userProvider;
111  protected final CellBlockBuilder cellBlockBuilder;
112
113  protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
114  // time (in ms), it will be closed at any moment.
115  protected final int maxRetries; // the max. no. of retries for socket connections
116  protected final long failureSleep; // Time to sleep before retry on failure.
117  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
118  protected final boolean tcpKeepAlive; // if T then use keepalives
119  protected final Codec codec;
120  protected final CompressionCodec compressor;
121  protected final boolean fallbackAllowed;
122
123  protected final FailedServers failedServers;
124
125  protected final int connectTO;
126  protected final int readTO;
127  protected final int writeTO;
128
129  private final PoolMap<ConnectionId, T> connections;
130
131  private final AtomicInteger callIdCnt = new AtomicInteger(0);
132
133  private final ScheduledFuture<?> cleanupIdleConnectionTask;
134
135  private int maxConcurrentCallsPerServer;
136
137  private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
138      CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
139          build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
140            @Override public AtomicInteger load(InetSocketAddress key) throws Exception {
141              return new AtomicInteger(0);
142            }
143          });
144
145  /**
146   * Construct an IPC client for the cluster <code>clusterId</code>
147   * @param conf configuration
148   * @param clusterId the cluster id
149   * @param localAddr client socket bind address.
150   * @param metrics the connection metrics
151   */
152  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
153      MetricsConnection metrics) {
154    this.userProvider = UserProvider.instantiate(conf);
155    this.localAddr = localAddr;
156    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
157    this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
158    this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
159      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
160    this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
161    this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
162    this.cellBlockBuilder = new CellBlockBuilder(conf);
163
164    this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
165    this.conf = conf;
166    this.codec = getCodec();
167    this.compressor = getCompressor(conf);
168    this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
169      IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
170    this.failedServers = new FailedServers(conf);
171    this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
172    this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
173    this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
174    this.metrics = metrics;
175    this.maxConcurrentCallsPerServer = conf.getInt(
176        HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
177        HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
178
179    this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
180
181    this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
182
183      @Override
184      public void run() {
185        cleanupIdleConnections();
186      }
187    }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
188
189    if (LOG.isDebugEnabled()) {
190      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
191          + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
192          + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
193          + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
194          + this.fallbackAllowed + ", bind address="
195          + (this.localAddr != null ? this.localAddr : "null"));
196    }
197  }
198
199  private void cleanupIdleConnections() {
200    long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
201    synchronized (connections) {
202      for (T conn : connections.values()) {
203        // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
204        // connection itself has already shutdown. The latter check is because we may still
205        // have some pending calls on connection so we should not shutdown the connection outside.
206        // The connection itself will disconnect if there is no pending call for maxIdleTime.
207        if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
208          if (LOG.isTraceEnabled()) {
209            LOG.trace("Cleanup idle connection to {}", conn.remoteId().address);
210          }
211          connections.remove(conn.remoteId(), conn);
212          conn.cleanupConnection();
213        }
214      }
215    }
216  }
217
218  public static String getDefaultCodec(final Configuration c) {
219    // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
220    // Configuration will complain -- then no default codec (and we'll pb everything). Else
221    // default is KeyValueCodec
222    return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
223  }
224
225  /**
226   * Encapsulate the ugly casting and RuntimeException conversion in private method.
227   * @return Codec to use on this client.
228   */
229  Codec getCodec() {
230    // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
231    // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
232    String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
233    if (className == null || className.length() == 0) {
234      return null;
235    }
236    try {
237      return (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
238    } catch (Exception e) {
239      throw new RuntimeException("Failed getting codec " + className, e);
240    }
241  }
242
243  @Override
244  public boolean hasCellBlockSupport() {
245    return this.codec != null;
246  }
247
248  // for writing tests that want to throw exception when connecting.
249  boolean isTcpNoDelay() {
250    return tcpNoDelay;
251  }
252
253  /**
254   * Encapsulate the ugly casting and RuntimeException conversion in private method.
255   * @param conf configuration
256   * @return The compressor to use on this client.
257   */
258  private static CompressionCodec getCompressor(final Configuration conf) {
259    String className = conf.get("hbase.client.rpc.compressor", null);
260    if (className == null || className.isEmpty()) {
261      return null;
262    }
263    try {
264      return (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
265    } catch (Exception e) {
266      throw new RuntimeException("Failed getting compressor " + className, e);
267    }
268  }
269
270  /**
271   * Return the pool type specified in the configuration, which must be set to either
272   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
273   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
274   * former. For applications with many user threads, use a small round-robin pool. For applications
275   * with few user threads, you may want to try using a thread-local pool. In any case, the number
276   * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
277   * system's hard limit on the number of connections.
278   * @param config configuration
279   * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
280   *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
281   */
282  private static PoolMap.PoolType getPoolType(Configuration config) {
283    return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
284      PoolMap.PoolType.RoundRobin);
285  }
286
287  /**
288   * Return the pool size specified in the configuration, which is applicable only if the pool type
289   * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
290   * @param config configuration
291   * @return the maximum pool size
292   */
293  private static int getPoolSize(Configuration config) {
294    int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
295
296    if (poolSize <= 0) {
297      LOG.warn("{} must be positive. Using default value: 1", HConstants.HBASE_CLIENT_IPC_POOL_SIZE);
298      return 1;
299    } else {
300      return poolSize;
301    }
302  }
303
304  private int nextCallId() {
305    int id, next;
306    do {
307      id = callIdCnt.get();
308      next = id < Integer.MAX_VALUE ? id + 1 : 0;
309    } while (!callIdCnt.compareAndSet(id, next));
310    return id;
311  }
312
313  /**
314   * Make a blocking call. Throws exceptions if there are network problems or if the remote code
315   * threw an exception.
316   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
317   *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
318   *          new Connection each time.
319   * @return A pair with the Message response and the Cell data (if any).
320   */
321  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
322      Message param, Message returnType, final User ticket, final InetSocketAddress isa)
323      throws ServiceException {
324    BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
325    callMethod(md, hrc, param, returnType, ticket, isa, done);
326    Message val;
327    try {
328      val = done.get();
329    } catch (IOException e) {
330      throw new ServiceException(e);
331    }
332    if (hrc.failed()) {
333      throw new ServiceException(hrc.getFailed());
334    } else {
335      return val;
336    }
337  }
338
339  /**
340   * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
341   * given host/port are reused.
342   */
343  private T getConnection(ConnectionId remoteId) throws IOException {
344    if (failedServers.isFailedServer(remoteId.getAddress())) {
345      if (LOG.isDebugEnabled()) {
346        LOG.debug("Not trying to connect to " + remoteId.address
347            + " this server is in the failed servers list");
348      }
349      throw new FailedServerException(
350          "This server is in the failed servers list: " + remoteId.address);
351    }
352    T conn;
353    synchronized (connections) {
354      if (!running) {
355        throw new StoppedRpcClientException();
356      }
357      conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId));
358      conn.setLastTouched(EnvironmentEdgeManager.currentTime());
359    }
360    return conn;
361  }
362
363  /**
364   * Not connected.
365   */
366  protected abstract T createConnection(ConnectionId remoteId) throws IOException;
367
368  private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
369      RpcCallback<Message> callback) {
370    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
371    if (metrics != null) {
372      metrics.updateRpc(call.md, call.param, call.callStats);
373    }
374    if (LOG.isTraceEnabled()) {
375      LOG.trace(
376        "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
377    }
378    if (call.error != null) {
379      if (call.error instanceof RemoteException) {
380        call.error.fillInStackTrace();
381        hrc.setFailed(call.error);
382      } else {
383        hrc.setFailed(wrapException(addr, call.error));
384      }
385      callback.run(null);
386    } else {
387      hrc.setDone(call.cells);
388      callback.run(call.response);
389    }
390  }
391
392  Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
393      final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
394      final RpcCallback<Message> callback) {
395    final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
396    cs.setStartTime(EnvironmentEdgeManager.currentTime());
397
398    if (param instanceof ClientProtos.MultiRequest) {
399      ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
400      int numActions = 0;
401      for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
402        numActions += regionAction.getActionCount();
403      }
404
405      cs.setNumActionsPerServer(numActions);
406    }
407
408    final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
409    Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
410        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
411          @Override
412          public void run(Call call) {
413            counter.decrementAndGet();
414            onCallFinished(call, hrc, addr, callback);
415          }
416        }, cs);
417    ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
418    int count = counter.incrementAndGet();
419    try {
420      if (count > maxConcurrentCallsPerServer) {
421        throw new ServerTooBusyException(addr, count);
422      }
423      cs.setConcurrentCallsPerServer(count);
424      T connection = getConnection(remoteId);
425      connection.sendRequest(call, hrc);
426    } catch (Exception e) {
427      call.setException(toIOE(e));
428    }
429    return call;
430  }
431
432  InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
433    InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
434    if (addr.isUnresolved()) {
435      throw new UnknownHostException("can not resolve " + sn.getServerName());
436    }
437    return addr;
438  }
439
440  /**
441   * Interrupt the connections to the given ip:port server. This should be called if the server is
442   * known as actually dead. This will not prevent current operation to be retried, and, depending
443   * on their own behavior, they may retry on the same server. This can be a feature, for example at
444   * startup. In any case, they're likely to get connection refused (if the process died) or no
445   * route to host: i.e. their next retries should be faster and with a safe exception.
446   */
447  @Override
448  public void cancelConnections(ServerName sn) {
449    synchronized (connections) {
450      for (T connection : connections.values()) {
451        ConnectionId remoteId = connection.remoteId();
452        if (remoteId.address.getPort() == sn.getPort()
453            && remoteId.address.getHostName().equals(sn.getHostname())) {
454          LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
455              + connection.remoteId);
456          connections.remove(remoteId, connection);
457          connection.shutdown();
458          connection.cleanupConnection();
459        }
460      }
461    }
462  }
463  /**
464   * Configure an hbase rpccontroller
465   * @param controller to configure
466   * @param channelOperationTimeout timeout for operation
467   * @return configured controller
468   */
469  static HBaseRpcController configureHBaseRpcController(
470      RpcController controller, int channelOperationTimeout) {
471    HBaseRpcController hrc;
472    if (controller != null && controller instanceof HBaseRpcController) {
473      hrc = (HBaseRpcController) controller;
474      if (!hrc.hasCallTimeout()) {
475        hrc.setCallTimeout(channelOperationTimeout);
476      }
477    } else {
478      hrc = new HBaseRpcControllerImpl();
479      hrc.setCallTimeout(channelOperationTimeout);
480    }
481    return hrc;
482  }
483
484  protected abstract void closeInternal();
485
486  @Override
487  public void close() {
488    if (LOG.isDebugEnabled()) {
489      LOG.debug("Stopping rpc client");
490    }
491    Collection<T> connToClose;
492    synchronized (connections) {
493      if (!running) {
494        return;
495      }
496      running = false;
497      connToClose = connections.values();
498      connections.clear();
499    }
500    cleanupIdleConnectionTask.cancel(true);
501    for (T conn : connToClose) {
502      conn.shutdown();
503    }
504    closeInternal();
505    for (T conn : connToClose) {
506      conn.cleanupConnection();
507    }
508  }
509
510  @Override
511  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
512      int rpcTimeout) throws UnknownHostException {
513    return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
514  }
515
516  @Override
517  public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
518      throws UnknownHostException {
519    return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
520  }
521
522  private static class AbstractRpcChannel {
523
524    protected final InetSocketAddress addr;
525
526    protected final AbstractRpcClient<?> rpcClient;
527
528    protected final User ticket;
529
530    protected final int rpcTimeout;
531
532    protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
533        User ticket, int rpcTimeout) {
534      this.addr = addr;
535      this.rpcClient = rpcClient;
536      this.ticket = ticket;
537      this.rpcTimeout = rpcTimeout;
538    }
539
540    /**
541     * Configure an rpc controller
542     * @param controller to configure
543     * @return configured rpc controller
544     */
545    protected HBaseRpcController configureRpcController(RpcController controller) {
546      HBaseRpcController hrc;
547      // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
548      // side. And now we may use ServerRpcController.
549      if (controller != null && controller instanceof HBaseRpcController) {
550        hrc = (HBaseRpcController) controller;
551        if (!hrc.hasCallTimeout()) {
552          hrc.setCallTimeout(rpcTimeout);
553        }
554      } else {
555        hrc = new HBaseRpcControllerImpl();
556        hrc.setCallTimeout(rpcTimeout);
557      }
558      return hrc;
559    }
560  }
561
562  /**
563   * Blocking rpc channel that goes via hbase rpc.
564   */
565  public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
566      implements BlockingRpcChannel {
567
568    protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
569        InetSocketAddress addr, User ticket, int rpcTimeout) {
570      super(rpcClient, addr, ticket, rpcTimeout);
571    }
572
573    @Override
574    public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
575        Message param, Message returnType) throws ServiceException {
576      return rpcClient.callBlockingMethod(md, configureRpcController(controller),
577        param, returnType, ticket, addr);
578    }
579  }
580
581  /**
582   * Async rpc channel that goes via hbase rpc.
583   */
584  public static class RpcChannelImplementation extends AbstractRpcChannel implements
585      RpcChannel {
586
587    protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
588        User ticket, int rpcTimeout) throws UnknownHostException {
589      super(rpcClient, addr, ticket, rpcTimeout);
590    }
591
592    @Override
593    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
594        Message param, Message returnType, RpcCallback<Message> done) {
595      // This method does not throw any exceptions, so the caller must provide a
596      // HBaseRpcController which is used to pass the exceptions.
597      this.rpcClient.callMethod(md,
598        configureRpcController(Preconditions.checkNotNull(controller,
599          "RpcController can not be null for async rpc call")),
600        param, returnType, ticket, addr, done);
601    }
602  }
603}