001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
021import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
022
023import io.opentelemetry.api.trace.Span;
024import io.opentelemetry.api.trace.StatusCode;
025import io.opentelemetry.context.Scope;
026import java.io.IOException;
027import java.net.SocketAddress;
028import java.util.Collection;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ScheduledExecutorService;
031import java.util.concurrent.ScheduledFuture;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.client.MetricsConnection;
038import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder;
039import org.apache.hadoop.hbase.codec.Codec;
040import org.apache.hadoop.hbase.codec.KeyValueCodec;
041import org.apache.hadoop.hbase.net.Address;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.hadoop.hbase.security.UserProvider;
044import org.apache.hadoop.hbase.trace.TraceUtil;
045import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
046import org.apache.hadoop.hbase.util.PoolMap;
047import org.apache.hadoop.hbase.util.Threads;
048import org.apache.hadoop.io.compress.CompressionCodec;
049import org.apache.hadoop.ipc.RemoteException;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
055import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
056import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
057import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
059import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
060import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
061import org.apache.hbase.thirdparty.com.google.protobuf.Message;
062import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
063import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
064import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
065import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
066import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
067
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
069
070/**
071 * Provides the basics for a RpcClient implementation like configuration and Logging.
072 * <p>
073 * Locking schema of the current IPC implementation
074 * <ul>
075 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
076 * connection.</li>
077 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
078 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
079 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
080 * of how to deal with cancel.</li>
081 * <li>For connection implementation, the construction of a connection should be as fast as possible
082 * because the creation is protected under a lock. Connect to remote side when needed. There is no
083 * forced locking schema for a connection implementation.</li>
084 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
085 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
086 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
087 * of the callbacks are free to hold any lock.</li>
088 * </ul>
089 * @since 2.0.0
090 */
091@InterfaceAudience.Private
092public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
093  // Log level is being changed in tests
094  public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
095
096  protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
097    new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true)
098      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
099    10, TimeUnit.MILLISECONDS);
100
101  private static final ScheduledExecutorService IDLE_CONN_SWEEPER =
102    Executors.newScheduledThreadPool(1,
103      new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
104        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
105
106  private boolean running = true; // if client runs
107
108  protected final Configuration conf;
109  protected final String clusterId;
110  protected final SocketAddress localAddr;
111  protected final MetricsConnection metrics;
112
113  protected final UserProvider userProvider;
114  protected final CellBlockBuilder cellBlockBuilder;
115
116  protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
117  // time (in ms), it will be closed at any moment.
118  protected final int maxRetries; // the max. no. of retries for socket connections
119  protected final long failureSleep; // Time to sleep before retry on failure.
120  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
121  protected final boolean tcpKeepAlive; // if T then use keepalives
122  protected final Codec codec;
123  protected final CompressionCodec compressor;
124  protected final boolean fallbackAllowed;
125
126  protected final FailedServers failedServers;
127
128  protected final int connectTO;
129  protected final int readTO;
130  protected final int writeTO;
131
132  private final PoolMap<ConnectionId, T> connections;
133
134  private final AtomicInteger callIdCnt = new AtomicInteger(0);
135
136  private final ScheduledFuture<?> cleanupIdleConnectionTask;
137
138  private int maxConcurrentCallsPerServer;
139
140  private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache =
141    CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
142      .build(new CacheLoader<Address, AtomicInteger>() {
143        @Override
144        public AtomicInteger load(Address key) throws Exception {
145          return new AtomicInteger(0);
146        }
147      });
148
149  /**
150   * Construct an IPC client for the cluster <code>clusterId</code>
151   * @param conf      configuration
152   * @param clusterId the cluster id
153   * @param localAddr client socket bind address.
154   * @param metrics   the connection metrics
155   */
156  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
157    MetricsConnection metrics) {
158    this.userProvider = UserProvider.instantiate(conf);
159    this.localAddr = localAddr;
160    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
161    this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
162    this.failureSleep =
163      conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
164    this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
165    this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
166    this.cellBlockBuilder = new CellBlockBuilder(conf);
167
168    this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
169    this.conf = conf;
170    this.codec = getCodec();
171    this.compressor = getCompressor(conf);
172    this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
173      IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
174    this.failedServers = new FailedServers(conf);
175    this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
176    this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
177    this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
178    this.metrics = metrics;
179    this.maxConcurrentCallsPerServer =
180      conf.getInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
181        HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
182
183    this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
184
185    this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
186
187      @Override
188      public void run() {
189        cleanupIdleConnections();
190      }
191    }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
192
193    if (LOG.isDebugEnabled()) {
194      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
195        + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
196        + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
197        + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
198        + this.fallbackAllowed + ", bind address="
199        + (this.localAddr != null ? this.localAddr : "null"));
200    }
201  }
202
203  private void cleanupIdleConnections() {
204    long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
205    synchronized (connections) {
206      for (T conn : connections.values()) {
207        // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
208        // connection itself has already shutdown. The latter check is because we may still
209        // have some pending calls on connection so we should not shutdown the connection outside.
210        // The connection itself will disconnect if there is no pending call for maxIdleTime.
211        if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
212          if (LOG.isTraceEnabled()) {
213            LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress());
214          }
215          connections.remove(conn.remoteId(), conn);
216          conn.cleanupConnection();
217        }
218      }
219    }
220  }
221
222  public static String getDefaultCodec(final Configuration c) {
223    // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
224    // Configuration will complain -- then no default codec (and we'll pb everything). Else
225    // default is KeyValueCodec
226    return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
227  }
228
229  /**
230   * Encapsulate the ugly casting and RuntimeException conversion in private method.
231   * @return Codec to use on this client.
232   */
233  Codec getCodec() {
234    // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
235    // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
236    String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
237    if (className == null || className.length() == 0) {
238      return null;
239    }
240    try {
241      return Class.forName(className).asSubclass(Codec.class).getDeclaredConstructor()
242        .newInstance();
243    } catch (Exception e) {
244      throw new RuntimeException("Failed getting codec " + className, e);
245    }
246  }
247
248  @Override
249  public boolean hasCellBlockSupport() {
250    return this.codec != null;
251  }
252
253  // for writing tests that want to throw exception when connecting.
254  boolean isTcpNoDelay() {
255    return tcpNoDelay;
256  }
257
258  /**
259   * Encapsulate the ugly casting and RuntimeException conversion in private method.
260   * @param conf configuration
261   * @return The compressor to use on this client.
262   */
263  private static CompressionCodec getCompressor(final Configuration conf) {
264    String className = conf.get("hbase.client.rpc.compressor", null);
265    if (className == null || className.isEmpty()) {
266      return null;
267    }
268    try {
269      return Class.forName(className).asSubclass(CompressionCodec.class).getDeclaredConstructor()
270        .newInstance();
271    } catch (Exception e) {
272      throw new RuntimeException("Failed getting compressor " + className, e);
273    }
274  }
275
276  /**
277   * Return the pool type specified in the configuration, which must be set to either
278   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
279   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
280   * former. For applications with many user threads, use a small round-robin pool. For applications
281   * with few user threads, you may want to try using a thread-local pool. In any case, the number
282   * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
283   * system's hard limit on the number of connections.
284   * @param config configuration
285   * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
286   *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
287   */
288  private static PoolMap.PoolType getPoolType(Configuration config) {
289    return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
290      PoolMap.PoolType.RoundRobin);
291  }
292
293  /**
294   * Return the pool size specified in the configuration, which is applicable only if the pool type
295   * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
296   * @param config configuration
297   * @return the maximum pool size
298   */
299  private static int getPoolSize(Configuration config) {
300    int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
301
302    if (poolSize <= 0) {
303      LOG.warn("{} must be positive. Using default value: 1",
304        HConstants.HBASE_CLIENT_IPC_POOL_SIZE);
305      return 1;
306    } else {
307      return poolSize;
308    }
309  }
310
311  private int nextCallId() {
312    int id, next;
313    do {
314      id = callIdCnt.get();
315      next = id < Integer.MAX_VALUE ? id + 1 : 0;
316    } while (!callIdCnt.compareAndSet(id, next));
317    return id;
318  }
319
320  /**
321   * Make a blocking call. Throws exceptions if there are network problems or if the remote code
322   * threw an exception.
323   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
324   *               {@link UserProvider#getCurrent()} makes a new instance of User each time so will
325   *               be a new Connection each time.
326   * @return A pair with the Message response and the Cell data (if any).
327   */
328  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
329    Message param, Message returnType, final User ticket, final Address isa)
330    throws ServiceException {
331    BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
332    callMethod(md, hrc, param, returnType, ticket, isa, done);
333    Message val;
334    try {
335      val = done.get();
336    } catch (IOException e) {
337      throw new ServiceException(e);
338    }
339    if (hrc.failed()) {
340      throw new ServiceException(hrc.getFailed());
341    } else {
342      return val;
343    }
344  }
345
346  /**
347   * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
348   * given host/port are reused.
349   */
350  private T getConnection(ConnectionId remoteId) throws IOException {
351    if (failedServers.isFailedServer(remoteId.getAddress())) {
352      if (LOG.isDebugEnabled()) {
353        LOG.debug("Not trying to connect to " + remoteId.getAddress()
354          + " this server is in the failed servers list");
355      }
356      throw new FailedServerException(
357        "This server is in the failed servers list: " + remoteId.getAddress());
358    }
359    T conn;
360    synchronized (connections) {
361      if (!running) {
362        throw new StoppedRpcClientException();
363      }
364      conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId));
365      conn.setLastTouched(EnvironmentEdgeManager.currentTime());
366    }
367    return conn;
368  }
369
370  /**
371   * Not connected.
372   */
373  protected abstract T createConnection(ConnectionId remoteId) throws IOException;
374
375  private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
376    RpcCallback<Message> callback) {
377    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
378    if (metrics != null) {
379      metrics.updateRpc(call.md, call.param, call.callStats);
380    }
381    if (LOG.isTraceEnabled()) {
382      LOG.trace("CallId: {}, call: {}, startTime: {}ms, callTime: {}ms", call.id, call.md.getName(),
383        call.getStartTime(), call.callStats.getCallTimeMs());
384    }
385    if (call.error != null) {
386      if (call.error instanceof RemoteException) {
387        call.error.fillInStackTrace();
388        hrc.setFailed(call.error);
389      } else {
390        hrc.setFailed(wrapException(addr, hrc.getRegionInfo(), call.error));
391      }
392      callback.run(null);
393    } else {
394      hrc.setDone(call.cells);
395      callback.run(call.response);
396    }
397  }
398
399  private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
400    final Message param, Message returnType, final User ticket, final Address addr,
401    final RpcCallback<Message> callback) {
402    Span span = new IpcClientSpanBuilder().setMethodDescriptor(md).setRemoteAddress(addr).build();
403    try (Scope scope = span.makeCurrent()) {
404      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
405      cs.setStartTime(EnvironmentEdgeManager.currentTime());
406
407      if (param instanceof ClientProtos.MultiRequest) {
408        ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
409        int numActions = 0;
410        for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
411          numActions += regionAction.getActionCount();
412        }
413
414        cs.setNumActionsPerServer(numActions);
415      }
416
417      final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
418      Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
419        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
420          @Override
421          public void run(Call call) {
422            try (Scope scope = call.span.makeCurrent()) {
423              counter.decrementAndGet();
424              onCallFinished(call, hrc, addr, callback);
425            } finally {
426              if (hrc.failed()) {
427                TraceUtil.setError(span, hrc.getFailed());
428              } else {
429                span.setStatus(StatusCode.OK);
430              }
431              span.end();
432            }
433          }
434        }, cs);
435      ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
436      int count = counter.incrementAndGet();
437      try {
438        if (count > maxConcurrentCallsPerServer) {
439          throw new ServerTooBusyException(addr, count);
440        }
441        cs.setConcurrentCallsPerServer(count);
442        T connection = getConnection(remoteId);
443        connection.sendRequest(call, hrc);
444      } catch (Exception e) {
445        call.setException(toIOE(e));
446        span.end();
447      }
448      return call;
449    }
450  }
451
452  private static Address createAddr(ServerName sn) {
453    return Address.fromParts(sn.getHostname(), sn.getPort());
454  }
455
456  /**
457   * Interrupt the connections to the given ip:port server. This should be called if the server is
458   * known as actually dead. This will not prevent current operation to be retried, and, depending
459   * on their own behavior, they may retry on the same server. This can be a feature, for example at
460   * startup. In any case, they're likely to get connection refused (if the process died) or no
461   * route to host: i.e. their next retries should be faster and with a safe exception.
462   */
463  @Override
464  public void cancelConnections(ServerName sn) {
465    synchronized (connections) {
466      for (T connection : connections.values()) {
467        ConnectionId remoteId = connection.remoteId();
468        if (
469          remoteId.getAddress().getPort() == sn.getPort()
470            && remoteId.getAddress().getHostName().equals(sn.getHostname())
471        ) {
472          LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
473            + connection.remoteId);
474          connections.remove(remoteId, connection);
475          connection.shutdown();
476          connection.cleanupConnection();
477        }
478      }
479    }
480  }
481
482  /**
483   * Configure an hbase rpccontroller
484   * @param controller              to configure
485   * @param channelOperationTimeout timeout for operation
486   * @return configured controller
487   */
488  static HBaseRpcController configureHBaseRpcController(RpcController controller,
489    int channelOperationTimeout) {
490    HBaseRpcController hrc;
491    if (controller != null && controller instanceof HBaseRpcController) {
492      hrc = (HBaseRpcController) controller;
493      if (!hrc.hasCallTimeout()) {
494        hrc.setCallTimeout(channelOperationTimeout);
495      }
496    } else {
497      hrc = new HBaseRpcControllerImpl();
498      hrc.setCallTimeout(channelOperationTimeout);
499    }
500    return hrc;
501  }
502
503  protected abstract void closeInternal();
504
505  @Override
506  public void close() {
507    if (LOG.isDebugEnabled()) {
508      LOG.debug("Stopping rpc client");
509    }
510    Collection<T> connToClose;
511    synchronized (connections) {
512      if (!running) {
513        return;
514      }
515      running = false;
516      connToClose = connections.values();
517      connections.clear();
518    }
519    cleanupIdleConnectionTask.cancel(true);
520    for (T conn : connToClose) {
521      conn.shutdown();
522    }
523    closeInternal();
524    for (T conn : connToClose) {
525      conn.cleanupConnection();
526    }
527  }
528
529  @Override
530  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
531    int rpcTimeout) {
532    return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
533  }
534
535  @Override
536  public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
537    return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
538  }
539
540  private static class AbstractRpcChannel {
541
542    protected final Address addr;
543
544    protected final AbstractRpcClient<?> rpcClient;
545
546    protected final User ticket;
547
548    protected final int rpcTimeout;
549
550    protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr, User ticket,
551      int rpcTimeout) {
552      this.addr = addr;
553      this.rpcClient = rpcClient;
554      this.ticket = ticket;
555      this.rpcTimeout = rpcTimeout;
556    }
557
558    /**
559     * Configure an rpc controller
560     * @param controller to configure
561     * @return configured rpc controller
562     */
563    protected HBaseRpcController configureRpcController(RpcController controller) {
564      HBaseRpcController hrc;
565      // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
566      // side. And now we may use ServerRpcController.
567      if (controller != null && controller instanceof HBaseRpcController) {
568        hrc = (HBaseRpcController) controller;
569        if (!hrc.hasCallTimeout()) {
570          hrc.setCallTimeout(rpcTimeout);
571        }
572      } else {
573        hrc = new HBaseRpcControllerImpl();
574        hrc.setCallTimeout(rpcTimeout);
575      }
576      return hrc;
577    }
578  }
579
580  /**
581   * Blocking rpc channel that goes via hbase rpc.
582   */
583  public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
584    implements BlockingRpcChannel {
585
586    protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr,
587      User ticket, int rpcTimeout) {
588      super(rpcClient, addr, ticket, rpcTimeout);
589    }
590
591    @Override
592    public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
593      Message param, Message returnType) throws ServiceException {
594      return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
595        ticket, addr);
596    }
597  }
598
599  /**
600   * Async rpc channel that goes via hbase rpc.
601   */
602  public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel {
603
604    protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr, User ticket,
605      int rpcTimeout) {
606      super(rpcClient, addr, ticket, rpcTimeout);
607    }
608
609    @Override
610    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
611      Message returnType, RpcCallback<Message> done) {
612      HBaseRpcController configuredController = configureRpcController(
613        Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
614      // This method does not throw any exceptions, so the caller must provide a
615      // HBaseRpcController which is used to pass the exceptions.
616      this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
617    }
618  }
619}