001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
021import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
022
023import com.google.errorprone.annotations.RestrictedApi;
024import io.opentelemetry.api.trace.Span;
025import io.opentelemetry.api.trace.StatusCode;
026import io.opentelemetry.context.Scope;
027import java.io.IOException;
028import java.net.SocketAddress;
029import java.util.Collection;
030import java.util.Map;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.ScheduledFuture;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.client.MetricsConnection;
040import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder;
041import org.apache.hadoop.hbase.codec.Codec;
042import org.apache.hadoop.hbase.codec.KeyValueCodec;
043import org.apache.hadoop.hbase.net.Address;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.security.UserProvider;
046import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
047import org.apache.hadoop.hbase.trace.TraceUtil;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.util.PoolMap;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.io.compress.CompressionCodec;
052import org.apache.hadoop.ipc.RemoteException;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
058import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
059import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
060import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
062import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
063import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
064import org.apache.hbase.thirdparty.com.google.protobuf.Message;
065import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
066import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
067import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
069import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
070
071import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
072
073/**
074 * Provides the basics for a RpcClient implementation like configuration and Logging.
075 * <p>
076 * Locking schema of the current IPC implementation
077 * <ul>
078 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
079 * connection.</li>
080 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
081 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
082 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
083 * of how to deal with cancel.</li>
084 * <li>For connection implementation, the construction of a connection should be as fast as possible
085 * because the creation is protected under a lock. Connect to remote side when needed. There is no
086 * forced locking schema for a connection implementation.</li>
087 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
088 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
089 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
090 * of the callbacks are free to hold any lock.</li>
091 * </ul>
092 * @since 2.0.0
093 */
094@InterfaceAudience.Private
095public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
096  // Log level is being changed in tests
097  public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
098
099  protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
100    new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true)
101      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
102    10, TimeUnit.MILLISECONDS);
103
104  private static final ScheduledExecutorService IDLE_CONN_SWEEPER =
105    Executors.newScheduledThreadPool(1,
106      new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
107        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
108
109  private boolean running = true; // if client runs
110
111  protected final Configuration conf;
112  protected final Map<String, byte[]> connectionAttributes;
113  protected final String clusterId;
114  protected final SocketAddress localAddr;
115  protected final MetricsConnection metrics;
116
117  protected final UserProvider userProvider;
118  protected final CellBlockBuilder cellBlockBuilder;
119
120  protected final SaslClientAuthenticationProviders providers;
121
122  protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
123  // time (in ms), it will be closed at any moment.
124  protected final int maxRetries; // the max. no. of retries for socket connections
125  protected final long failureSleep; // Time to sleep before retry on failure.
126  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
127  protected final boolean tcpKeepAlive; // if T then use keepalives
128  protected final Codec codec;
129  protected final CompressionCodec compressor;
130  protected final boolean fallbackAllowed;
131
132  protected final FailedServers failedServers;
133
134  protected final int connectTO;
135  protected final int readTO;
136  protected final int writeTO;
137
138  private final PoolMap<ConnectionId, T> connections;
139
140  private final AtomicInteger callIdCnt = new AtomicInteger(0);
141
142  private final ScheduledFuture<?> cleanupIdleConnectionTask;
143
144  private int maxConcurrentCallsPerServer;
145
146  private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache =
147    CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
148      .build(new CacheLoader<Address, AtomicInteger>() {
149        @Override
150        public AtomicInteger load(Address key) throws Exception {
151          return new AtomicInteger(0);
152        }
153      });
154
155  /**
156   * Construct an IPC client for the cluster <code>clusterId</code>
157   * @param conf      configuration
158   * @param clusterId the cluster id
159   * @param localAddr client socket bind address.
160   * @param metrics   the connection metrics
161   */
162  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
163    MetricsConnection metrics, Map<String, byte[]> connectionAttributes) {
164    this.userProvider = UserProvider.instantiate(conf);
165    this.localAddr = localAddr;
166    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
167    this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
168    this.failureSleep =
169      conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
170    this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
171    this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
172    this.cellBlockBuilder = new CellBlockBuilder(conf);
173
174    this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
175    this.conf = conf;
176    this.connectionAttributes = connectionAttributes;
177    this.codec = getCodec();
178    this.compressor = getCompressor(conf);
179    this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
180      IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
181    this.failedServers = new FailedServers(conf);
182    this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
183    this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
184    this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
185    this.metrics = metrics;
186    this.maxConcurrentCallsPerServer =
187      conf.getInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
188        HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
189
190    this.providers = new SaslClientAuthenticationProviders(conf);
191
192    this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
193
194    this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
195
196      @Override
197      public void run() {
198        cleanupIdleConnections();
199      }
200    }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
201
202    if (LOG.isDebugEnabled()) {
203      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
204        + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
205        + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
206        + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
207        + this.fallbackAllowed + ", bind address="
208        + (this.localAddr != null ? this.localAddr : "null"));
209    }
210  }
211
212  private void cleanupIdleConnections() {
213    long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
214    synchronized (connections) {
215      for (T conn : connections.values()) {
216        // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
217        // connection itself has already shutdown. The latter check is because we may still
218        // have some pending calls on connection, so we should not shut down the connection outside.
219        // The connection itself will disconnect if there is no pending call for maxIdleTime.
220        if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
221          if (LOG.isTraceEnabled()) {
222            LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress());
223          }
224          connections.remove(conn.remoteId(), conn);
225          conn.cleanupConnection();
226        }
227      }
228    }
229  }
230
231  public static String getDefaultCodec(final Configuration c) {
232    // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
233    // Configuration will complain -- then no default codec (and we'll pb everything). Else
234    // default is KeyValueCodec
235    return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
236  }
237
238  /**
239   * Encapsulate the ugly casting and RuntimeException conversion in private method.
240   * @return Codec to use on this client.
241   */
242  protected Codec getCodec() {
243    // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
244    // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
245    String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
246    if (className == null || className.length() == 0) {
247      return null;
248    }
249    try {
250      return Class.forName(className).asSubclass(Codec.class).getDeclaredConstructor()
251        .newInstance();
252    } catch (Exception e) {
253      throw new RuntimeException("Failed getting codec " + className, e);
254    }
255  }
256
257  @Override
258  public boolean hasCellBlockSupport() {
259    return this.codec != null;
260  }
261
262  // for writing tests that want to throw exception when connecting.
263  protected boolean isTcpNoDelay() {
264    return tcpNoDelay;
265  }
266
267  /**
268   * Encapsulate the ugly casting and RuntimeException conversion in private method.
269   * @param conf configuration
270   * @return The compressor to use on this client.
271   */
272  private static CompressionCodec getCompressor(final Configuration conf) {
273    String className = conf.get("hbase.client.rpc.compressor", null);
274    if (className == null || className.isEmpty()) {
275      return null;
276    }
277    try {
278      return Class.forName(className).asSubclass(CompressionCodec.class).getDeclaredConstructor()
279        .newInstance();
280    } catch (Exception e) {
281      throw new RuntimeException("Failed getting compressor " + className, e);
282    }
283  }
284
285  /**
286   * Return the pool type specified in the configuration, which must be set to either
287   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
288   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
289   * former. For applications with many user threads, use a small round-robin pool. For applications
290   * with few user threads, you may want to try using a thread-local pool. In any case, the number
291   * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
292   * system's hard limit on the number of connections.
293   * @param config configuration
294   * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
295   *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
296   */
297  private static PoolMap.PoolType getPoolType(Configuration config) {
298    return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
299      PoolMap.PoolType.RoundRobin);
300  }
301
302  /**
303   * Return the pool size specified in the configuration, which is applicable only if the pool type
304   * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
305   * @param config configuration
306   * @return the maximum pool size
307   */
308  private static int getPoolSize(Configuration config) {
309    int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
310
311    if (poolSize <= 0) {
312      LOG.warn("{} must be positive. Using default value: 1",
313        HConstants.HBASE_CLIENT_IPC_POOL_SIZE);
314      return 1;
315    } else {
316      return poolSize;
317    }
318  }
319
320  private int nextCallId() {
321    int id, next;
322    do {
323      id = callIdCnt.get();
324      next = id < Integer.MAX_VALUE ? id + 1 : 0;
325    } while (!callIdCnt.compareAndSet(id, next));
326    return id;
327  }
328
329  /**
330   * Make a blocking call. Throws exceptions if there are network problems or if the remote code
331   * threw an exception.
332   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
333   *               {@link UserProvider#getCurrent()} makes a new instance of User each time so will
334   *               be a new Connection each time.
335   * @return A pair with the Message response and the Cell data (if any).
336   */
337  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
338    Message param, Message returnType, final User ticket, final Address isa)
339    throws ServiceException {
340    BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
341    callMethod(md, hrc, param, returnType, ticket, isa, done);
342    Message val;
343    try {
344      val = done.get();
345    } catch (IOException e) {
346      throw new ServiceException(e);
347    }
348    if (hrc.failed()) {
349      throw new ServiceException(hrc.getFailed());
350    } else {
351      return val;
352    }
353  }
354
355  /**
356   * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
357   * given host/port are reused.
358   */
359  private T getConnection(ConnectionId remoteId) throws IOException {
360    if (failedServers.isFailedServer(remoteId.getAddress())) {
361      if (LOG.isDebugEnabled()) {
362        LOG.debug("Not trying to connect to " + remoteId.getAddress()
363          + " this server is in the failed servers list");
364      }
365      throw new FailedServerException(
366        "This server is in the failed servers list: " + remoteId.getAddress());
367    }
368    T conn;
369    synchronized (connections) {
370      if (!running) {
371        throw new StoppedRpcClientException();
372      }
373      conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId));
374      conn.setLastTouched(EnvironmentEdgeManager.currentTime());
375    }
376    return conn;
377  }
378
379  /**
380   * Not connected.
381   */
382  protected abstract T createConnection(ConnectionId remoteId) throws IOException;
383
384  private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
385    RpcCallback<Message> callback) {
386    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
387    if (metrics != null) {
388      metrics.updateRpc(call.md, hrc.getTableName(), call.param, call.callStats, call.error);
389    }
390    if (LOG.isTraceEnabled()) {
391      LOG.trace("CallId: {}, call: {}, startTime: {}ms, callTime: {}ms, status: {}", call.id,
392        call.md.getName(), call.getStartTime(), call.callStats.getCallTimeMs(),
393        call.error != null ? "failed" : "successful");
394    }
395    if (call.error != null) {
396      if (call.error instanceof RemoteException) {
397        call.error.fillInStackTrace();
398        hrc.setFailed(call.error);
399      } else {
400        hrc.setFailed(wrapException(addr, hrc.getRegionInfo(), call.error));
401      }
402      callback.run(null);
403    } else {
404      hrc.setDone(call.cells);
405      callback.run(call.response);
406    }
407  }
408
409  private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
410    final Message param, Message returnType, final User ticket, final Address addr,
411    final RpcCallback<Message> callback) {
412    Span span = new IpcClientSpanBuilder().setMethodDescriptor(md).setRemoteAddress(addr).build();
413    try (Scope scope = span.makeCurrent()) {
414      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
415      cs.setStartTime(EnvironmentEdgeManager.currentTime());
416
417      if (param instanceof ClientProtos.MultiRequest) {
418        ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
419        int numActions = 0;
420        for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
421          numActions += regionAction.getActionCount();
422        }
423
424        cs.setNumActionsPerServer(numActions);
425      }
426
427      final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
428      Call call =
429        new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(),
430          hrc.getPriority(), hrc.getRequestAttributes(), new RpcCallback<Call>() {
431            @Override
432            public void run(Call call) {
433              try (Scope scope = call.span.makeCurrent()) {
434                counter.decrementAndGet();
435                onCallFinished(call, hrc, addr, callback);
436              } finally {
437                if (hrc.failed()) {
438                  TraceUtil.setError(span, hrc.getFailed());
439                } else {
440                  span.setStatus(StatusCode.OK);
441                }
442                span.end();
443              }
444            }
445          }, cs);
446      ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
447      int count = counter.incrementAndGet();
448      try {
449        if (count > maxConcurrentCallsPerServer) {
450          throw new ServerTooBusyException(addr, count);
451        }
452        cs.setConcurrentCallsPerServer(count);
453        T connection = getConnection(remoteId);
454        connection.sendRequest(call, hrc);
455      } catch (Exception e) {
456        call.setException(toIOE(e));
457        span.end();
458      }
459      return call;
460    }
461  }
462
463  static Address createAddr(ServerName sn) {
464    return Address.fromParts(sn.getHostname(), sn.getPort());
465  }
466
467  /**
468   * Interrupt the connections to the given ip:port server. This should be called if the server is
469   * known as actually dead. This will not prevent current operation to be retried, and, depending
470   * on their own behavior, they may retry on the same server. This can be a feature, for example at
471   * startup. In any case, they're likely to get connection refused (if the process died) or no
472   * route to host: i.e. their next retries should be faster and with a safe exception.
473   */
474  @Override
475  public void cancelConnections(ServerName sn) {
476    synchronized (connections) {
477      for (T connection : connections.values()) {
478        ConnectionId remoteId = connection.remoteId();
479        if (
480          remoteId.getAddress().getPort() == sn.getPort()
481            && remoteId.getAddress().getHostName().equals(sn.getHostname())
482        ) {
483          LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
484            + connection.remoteId);
485          connections.remove(remoteId, connection);
486          connection.shutdown();
487          connection.cleanupConnection();
488        }
489      }
490    }
491  }
492
493  /**
494   * Configure an hbase rpccontroller
495   * @param controller              to configure
496   * @param channelOperationTimeout timeout for operation
497   * @return configured controller
498   */
499  static HBaseRpcController configureHBaseRpcController(RpcController controller,
500    int channelOperationTimeout) {
501    HBaseRpcController hrc;
502    if (controller != null && controller instanceof HBaseRpcController) {
503      hrc = (HBaseRpcController) controller;
504      if (!hrc.hasCallTimeout()) {
505        hrc.setCallTimeout(channelOperationTimeout);
506      }
507    } else {
508      hrc = new HBaseRpcControllerImpl();
509      hrc.setCallTimeout(channelOperationTimeout);
510    }
511    return hrc;
512  }
513
514  protected abstract void closeInternal();
515
516  @Override
517  public void close() {
518    if (LOG.isDebugEnabled()) {
519      LOG.debug("Stopping rpc client");
520    }
521    Collection<T> connToClose;
522    synchronized (connections) {
523      if (!running) {
524        return;
525      }
526      running = false;
527      connToClose = connections.values();
528      connections.clear();
529    }
530    cleanupIdleConnectionTask.cancel(true);
531    for (T conn : connToClose) {
532      conn.shutdown();
533    }
534    closeInternal();
535    for (T conn : connToClose) {
536      conn.cleanupConnection();
537    }
538  }
539
540  @Override
541  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
542    int rpcTimeout) {
543    return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
544  }
545
546  @Override
547  public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
548    return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
549  }
550
551  @RestrictedApi(explanation = "Should only be called in tests", link = "",
552      allowedOnPath = ".*/src/test/.*")
553  PoolMap<ConnectionId, T> getConnections() {
554    return connections;
555  }
556
557  private static class AbstractRpcChannel {
558
559    protected final Address addr;
560
561    protected final AbstractRpcClient<?> rpcClient;
562
563    protected final User ticket;
564
565    protected final int rpcTimeout;
566
567    protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr, User ticket,
568      int rpcTimeout) {
569      this.addr = addr;
570      this.rpcClient = rpcClient;
571      this.ticket = ticket;
572      this.rpcTimeout = rpcTimeout;
573    }
574
575    /**
576     * Configure an rpc controller
577     * @param controller to configure
578     * @return configured rpc controller
579     */
580    protected HBaseRpcController configureRpcController(RpcController controller) {
581      HBaseRpcController hrc;
582      // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
583      // side. And now we may use ServerRpcController.
584      if (controller != null && controller instanceof HBaseRpcController) {
585        hrc = (HBaseRpcController) controller;
586        if (!hrc.hasCallTimeout()) {
587          hrc.setCallTimeout(rpcTimeout);
588        }
589      } else {
590        hrc = new HBaseRpcControllerImpl();
591        hrc.setCallTimeout(rpcTimeout);
592      }
593      return hrc;
594    }
595  }
596
597  /**
598   * Blocking rpc channel that goes via hbase rpc.
599   */
600  public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
601    implements BlockingRpcChannel {
602
603    protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr,
604      User ticket, int rpcTimeout) {
605      super(rpcClient, addr, ticket, rpcTimeout);
606    }
607
608    @Override
609    public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
610      Message param, Message returnType) throws ServiceException {
611      return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
612        ticket, addr);
613    }
614  }
615
616  /**
617   * Async rpc channel that goes via hbase rpc.
618   */
619  public static class RpcChannelImplementation extends AbstractRpcChannel implements RpcChannel {
620
621    protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr, User ticket,
622      int rpcTimeout) {
623      super(rpcClient, addr, ticket, rpcTimeout);
624    }
625
626    @Override
627    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param,
628      Message returnType, RpcCallback<Message> done) {
629      HBaseRpcController configuredController = configureRpcController(
630        Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
631      // This method does not throw any exceptions, so the caller must provide a
632      // HBaseRpcController which is used to pass the exceptions.
633      this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
634    }
635  }
636}