001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.ipc;
020
021import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
022import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
023
024import java.io.IOException;
025import java.net.InetSocketAddress;
026import java.net.SocketAddress;
027import java.net.UnknownHostException;
028import java.util.Collection;
029import java.util.concurrent.Executors;
030import java.util.concurrent.ScheduledExecutorService;
031import java.util.concurrent.ScheduledFuture;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicInteger;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.client.MetricsConnection;
038import org.apache.hadoop.hbase.codec.Codec;
039import org.apache.hadoop.hbase.codec.KeyValueCodec;
040import org.apache.hadoop.hbase.net.Address;
041import org.apache.hadoop.hbase.security.User;
042import org.apache.hadoop.hbase.security.UserProvider;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.hadoop.hbase.util.PoolMap;
045import org.apache.hadoop.hbase.util.Threads;
046import org.apache.hadoop.io.compress.CompressionCodec;
047import org.apache.hadoop.ipc.RemoteException;
048import org.apache.yetus.audience.InterfaceAudience;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
053import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
054import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
055import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
056import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
057import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
058import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
059import org.apache.hbase.thirdparty.com.google.protobuf.Message;
060import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
061import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
062import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
063import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
064import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
065
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
067
068/**
069 * Provides the basics for a RpcClient implementation like configuration and Logging.
070 * <p>
071 * Locking schema of the current IPC implementation
072 * <ul>
073 * <li>There is a lock in {@link AbstractRpcClient} to protect the fetching or creating
074 * connection.</li>
075 * <li>There is a lock in {@link Call} to make sure that we can only finish the call once.</li>
076 * <li>The same for {@link HBaseRpcController} as {@link Call}. And see the comment of
077 * {@link HBaseRpcController#notifyOnCancel(RpcCallback, HBaseRpcController.CancellationCallback)}
078 * of how to deal with cancel.</li>
079 * <li>For connection implementation, the construction of a connection should be as fast as possible
080 * because the creation is protected under a lock. Connect to remote side when needed. There is no
081 * forced locking schema for a connection implementation.</li>
082 * <li>For the locking order, the {@link Call} and {@link HBaseRpcController}'s lock should be held
083 * at last. So the callbacks in {@link Call} and {@link HBaseRpcController} should be execute
084 * outside the lock in {@link Call} and {@link HBaseRpcController} which means the implementations
085 * of the callbacks are free to hold any lock.</li>
086 * </ul>
087 * @since 2.0.0
088 */
089@InterfaceAudience.Private
090public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcClient {
091  // Log level is being changed in tests
092  public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
093
094  protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
095    new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true)
096      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
097    10, TimeUnit.MILLISECONDS);
098
099  private static final ScheduledExecutorService IDLE_CONN_SWEEPER =
100    Executors.newScheduledThreadPool(1,
101      new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
102        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
103
104  private boolean running = true; // if client runs
105
106  protected final Configuration conf;
107  protected final String clusterId;
108  protected final SocketAddress localAddr;
109  protected final MetricsConnection metrics;
110
111  protected final UserProvider userProvider;
112  protected final CellBlockBuilder cellBlockBuilder;
113
114  protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
115  // time (in ms), it will be closed at any moment.
116  protected final int maxRetries; // the max. no. of retries for socket connections
117  protected final long failureSleep; // Time to sleep before retry on failure.
118  protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
119  protected final boolean tcpKeepAlive; // if T then use keepalives
120  protected final Codec codec;
121  protected final CompressionCodec compressor;
122  protected final boolean fallbackAllowed;
123
124  protected final FailedServers failedServers;
125
126  protected final int connectTO;
127  protected final int readTO;
128  protected final int writeTO;
129
130  private final PoolMap<ConnectionId, T> connections;
131
132  private final AtomicInteger callIdCnt = new AtomicInteger(0);
133
134  private final ScheduledFuture<?> cleanupIdleConnectionTask;
135
136  private int maxConcurrentCallsPerServer;
137
138  private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache =
139      CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
140          build(new CacheLoader<Address, AtomicInteger>() {
141            @Override public AtomicInteger load(Address key) throws Exception {
142              return new AtomicInteger(0);
143            }
144          });
145
146  /**
147   * Construct an IPC client for the cluster <code>clusterId</code>
148   * @param conf configuration
149   * @param clusterId the cluster id
150   * @param localAddr client socket bind address.
151   * @param metrics the connection metrics
152   */
153  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
154      MetricsConnection metrics) {
155    this.userProvider = UserProvider.instantiate(conf);
156    this.localAddr = localAddr;
157    this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
158    this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
159    this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
160      HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
161    this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
162    this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
163    this.cellBlockBuilder = new CellBlockBuilder(conf);
164
165    this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
166    this.conf = conf;
167    this.codec = getCodec();
168    this.compressor = getCompressor(conf);
169    this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
170      IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
171    this.failedServers = new FailedServers(conf);
172    this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
173    this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
174    this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
175    this.metrics = metrics;
176    this.maxConcurrentCallsPerServer = conf.getInt(
177        HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
178        HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
179
180    this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
181
182    this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() {
183
184      @Override
185      public void run() {
186        cleanupIdleConnections();
187      }
188    }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS);
189
190    if (LOG.isDebugEnabled()) {
191      LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive="
192          + this.tcpKeepAlive + ", tcpNoDelay=" + this.tcpNoDelay + ", connectTO=" + this.connectTO
193          + ", readTO=" + this.readTO + ", writeTO=" + this.writeTO + ", minIdleTimeBeforeClose="
194          + this.minIdleTimeBeforeClose + ", maxRetries=" + this.maxRetries + ", fallbackAllowed="
195          + this.fallbackAllowed + ", bind address="
196          + (this.localAddr != null ? this.localAddr : "null"));
197    }
198  }
199
200  private void cleanupIdleConnections() {
201    long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
202    synchronized (connections) {
203      for (T conn : connections.values()) {
204        // Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
205        // connection itself has already shutdown. The latter check is because we may still
206        // have some pending calls on connection so we should not shutdown the connection outside.
207        // The connection itself will disconnect if there is no pending call for maxIdleTime.
208        if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
209          if (LOG.isTraceEnabled()) {
210            LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress());
211          }
212          connections.remove(conn.remoteId(), conn);
213          conn.cleanupConnection();
214        }
215      }
216    }
217  }
218
219  public static String getDefaultCodec(final Configuration c) {
220    // If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
221    // Configuration will complain -- then no default codec (and we'll pb everything). Else
222    // default is KeyValueCodec
223    return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
224  }
225
226  /**
227   * Encapsulate the ugly casting and RuntimeException conversion in private method.
228   * @return Codec to use on this client.
229   */
230  Codec getCodec() {
231    // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
232    // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
233    String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
234    if (className == null || className.length() == 0) {
235      return null;
236    }
237    try {
238      return (Codec) Class.forName(className).getDeclaredConstructor().newInstance();
239    } catch (Exception e) {
240      throw new RuntimeException("Failed getting codec " + className, e);
241    }
242  }
243
244  @Override
245  public boolean hasCellBlockSupport() {
246    return this.codec != null;
247  }
248
249  // for writing tests that want to throw exception when connecting.
250  boolean isTcpNoDelay() {
251    return tcpNoDelay;
252  }
253
254  /**
255   * Encapsulate the ugly casting and RuntimeException conversion in private method.
256   * @param conf configuration
257   * @return The compressor to use on this client.
258   */
259  private static CompressionCodec getCompressor(final Configuration conf) {
260    String className = conf.get("hbase.client.rpc.compressor", null);
261    if (className == null || className.isEmpty()) {
262      return null;
263    }
264    try {
265      return (CompressionCodec) Class.forName(className).getDeclaredConstructor().newInstance();
266    } catch (Exception e) {
267      throw new RuntimeException("Failed getting compressor " + className, e);
268    }
269  }
270
271  /**
272   * Return the pool type specified in the configuration, which must be set to either
273   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
274   * {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}, otherwise default to the
275   * former. For applications with many user threads, use a small round-robin pool. For applications
276   * with few user threads, you may want to try using a thread-local pool. In any case, the number
277   * of {@link org.apache.hadoop.hbase.ipc.RpcClient} instances should not exceed the operating
278   * system's hard limit on the number of connections.
279   * @param config configuration
280   * @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
281   *         {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
282   */
283  private static PoolMap.PoolType getPoolType(Configuration config) {
284    return PoolMap.PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE),
285      PoolMap.PoolType.RoundRobin);
286  }
287
288  /**
289   * Return the pool size specified in the configuration, which is applicable only if the pool type
290   * is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
291   * @param config configuration
292   * @return the maximum pool size
293   */
294  private static int getPoolSize(Configuration config) {
295    int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
296
297    if (poolSize <= 0) {
298      LOG.warn("{} must be positive. Using default value: 1", HConstants.HBASE_CLIENT_IPC_POOL_SIZE);
299      return 1;
300    } else {
301      return poolSize;
302    }
303  }
304
305  private int nextCallId() {
306    int id, next;
307    do {
308      id = callIdCnt.get();
309      next = id < Integer.MAX_VALUE ? id + 1 : 0;
310    } while (!callIdCnt.compareAndSet(id, next));
311    return id;
312  }
313
314  /**
315   * Make a blocking call. Throws exceptions if there are network problems or if the remote code
316   * threw an exception.
317   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
318   *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
319   *          new Connection each time.
320   * @return A pair with the Message response and the Cell data (if any).
321   */
322  private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
323      Message param, Message returnType, final User ticket, final InetSocketAddress isa)
324      throws ServiceException {
325    BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
326    callMethod(md, hrc, param, returnType, ticket, isa, done);
327    Message val;
328    try {
329      val = done.get();
330    } catch (IOException e) {
331      throw new ServiceException(e);
332    }
333    if (hrc.failed()) {
334      throw new ServiceException(hrc.getFailed());
335    } else {
336      return val;
337    }
338  }
339
340  /**
341   * Get a connection from the pool, or create a new one and add it to the pool. Connections to a
342   * given host/port are reused.
343   */
344  private T getConnection(ConnectionId remoteId) throws IOException {
345    if (failedServers.isFailedServer(remoteId.getAddress())) {
346      if (LOG.isDebugEnabled()) {
347        LOG.debug("Not trying to connect to " + remoteId.getAddress()
348            + " this server is in the failed servers list");
349      }
350      throw new FailedServerException(
351          "This server is in the failed servers list: " + remoteId.getAddress());
352    }
353    T conn;
354    synchronized (connections) {
355      if (!running) {
356        throw new StoppedRpcClientException();
357      }
358      conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId));
359      conn.setLastTouched(EnvironmentEdgeManager.currentTime());
360    }
361    return conn;
362  }
363
364  /**
365   * Not connected.
366   */
367  protected abstract T createConnection(ConnectionId remoteId) throws IOException;
368
369  private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
370      RpcCallback<Message> callback) {
371    call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
372    if (metrics != null) {
373      metrics.updateRpc(call.md, call.param, call.callStats);
374    }
375    if (LOG.isTraceEnabled()) {
376      LOG.trace(
377        "Call: " + call.md.getName() + ", callTime: " + call.callStats.getCallTimeMs() + "ms");
378    }
379    if (call.error != null) {
380      if (call.error instanceof RemoteException) {
381        call.error.fillInStackTrace();
382        hrc.setFailed(call.error);
383      } else {
384        hrc.setFailed(wrapException(addr, call.error));
385      }
386      callback.run(null);
387    } else {
388      hrc.setDone(call.cells);
389      callback.run(call.response);
390    }
391  }
392
393  Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
394      final Message param, Message returnType, final User ticket,
395      final InetSocketAddress inetAddr, final RpcCallback<Message> callback) {
396    final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
397    cs.setStartTime(EnvironmentEdgeManager.currentTime());
398
399    if (param instanceof ClientProtos.MultiRequest) {
400      ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
401      int numActions = 0;
402      for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
403        numActions += regionAction.getActionCount();
404      }
405
406      cs.setNumActionsPerServer(numActions);
407    }
408
409    final Address addr = Address.fromSocketAddress(inetAddr);
410    final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
411    Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
412        hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
413          @Override
414          public void run(Call call) {
415            counter.decrementAndGet();
416            onCallFinished(call, hrc, addr, callback);
417          }
418        }, cs);
419    ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
420    int count = counter.incrementAndGet();
421    try {
422      if (count > maxConcurrentCallsPerServer) {
423        throw new ServerTooBusyException(addr, count);
424      }
425      cs.setConcurrentCallsPerServer(count);
426      T connection = getConnection(remoteId);
427      connection.sendRequest(call, hrc);
428    } catch (Exception e) {
429      call.setException(toIOE(e));
430    }
431    return call;
432  }
433
434  private static Address createAddr(ServerName sn) {
435    return Address.fromParts(sn.getHostname(), sn.getPort());
436  }
437
438  /**
439   * Interrupt the connections to the given ip:port server. This should be called if the server is
440   * known as actually dead. This will not prevent current operation to be retried, and, depending
441   * on their own behavior, they may retry on the same server. This can be a feature, for example at
442   * startup. In any case, they're likely to get connection refused (if the process died) or no
443   * route to host: i.e. their next retries should be faster and with a safe exception.
444   */
445  @Override
446  public void cancelConnections(ServerName sn) {
447    synchronized (connections) {
448      for (T connection : connections.values()) {
449        ConnectionId remoteId = connection.remoteId();
450        if (remoteId.getAddress().getPort() == sn.getPort()
451            && remoteId.getAddress().getHostName().equals(sn.getHostname())) {
452          LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
453              + connection.remoteId);
454          connections.remove(remoteId, connection);
455          connection.shutdown();
456          connection.cleanupConnection();
457        }
458      }
459    }
460  }
461  /**
462   * Configure an hbase rpccontroller
463   * @param controller to configure
464   * @param channelOperationTimeout timeout for operation
465   * @return configured controller
466   */
467  static HBaseRpcController configureHBaseRpcController(
468      RpcController controller, int channelOperationTimeout) {
469    HBaseRpcController hrc;
470    if (controller != null && controller instanceof HBaseRpcController) {
471      hrc = (HBaseRpcController) controller;
472      if (!hrc.hasCallTimeout()) {
473        hrc.setCallTimeout(channelOperationTimeout);
474      }
475    } else {
476      hrc = new HBaseRpcControllerImpl();
477      hrc.setCallTimeout(channelOperationTimeout);
478    }
479    return hrc;
480  }
481
482  protected abstract void closeInternal();
483
484  @Override
485  public void close() {
486    if (LOG.isDebugEnabled()) {
487      LOG.debug("Stopping rpc client");
488    }
489    Collection<T> connToClose;
490    synchronized (connections) {
491      if (!running) {
492        return;
493      }
494      running = false;
495      connToClose = connections.values();
496      connections.clear();
497    }
498    cleanupIdleConnectionTask.cancel(true);
499    for (T conn : connToClose) {
500      conn.shutdown();
501    }
502    closeInternal();
503    for (T conn : connToClose) {
504      conn.cleanupConnection();
505    }
506  }
507
508  @Override
509  public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
510      int rpcTimeout) {
511    return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
512  }
513
514  @Override
515  public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
516    return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
517  }
518
519  private static class AbstractRpcChannel {
520
521    protected final Address addr;
522
523    // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
524    // per method call on the channel. If the remote target is removed or reprovisioned and
525    // its identity changes a new channel with a newly resolved InetSocketAddress will be
526    // created as part of retry, so caching here is fine.
527    // Normally, caching an InetSocketAddress is an anti-pattern.
528    protected InetSocketAddress isa;
529
530    protected final AbstractRpcClient<?> rpcClient;
531
532    protected final User ticket;
533
534    protected final int rpcTimeout;
535
536    protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr,
537        User ticket, int rpcTimeout) {
538      this.addr = addr;
539      this.rpcClient = rpcClient;
540      this.ticket = ticket;
541      this.rpcTimeout = rpcTimeout;
542    }
543
544    /**
545     * Configure an rpc controller
546     * @param controller to configure
547     * @return configured rpc controller
548     */
549    protected HBaseRpcController configureRpcController(RpcController controller) {
550      HBaseRpcController hrc;
551      // TODO: Ideally we should not use an RpcController other than HBaseRpcController at client
552      // side. And now we may use ServerRpcController.
553      if (controller != null && controller instanceof HBaseRpcController) {
554        hrc = (HBaseRpcController) controller;
555        if (!hrc.hasCallTimeout()) {
556          hrc.setCallTimeout(rpcTimeout);
557        }
558      } else {
559        hrc = new HBaseRpcControllerImpl();
560        hrc.setCallTimeout(rpcTimeout);
561      }
562      return hrc;
563    }
564  }
565
566  /**
567   * Blocking rpc channel that goes via hbase rpc.
568   */
569  public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
570      implements BlockingRpcChannel {
571
572    protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
573        Address addr, User ticket, int rpcTimeout) {
574      super(rpcClient, addr, ticket, rpcTimeout);
575    }
576
577    @Override
578    public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
579        Message param, Message returnType) throws ServiceException {
580      // Look up remote address upon first call
581      if (isa == null) {
582        if (this.rpcClient.metrics != null) {
583          this.rpcClient.metrics.incrNsLookups();
584        }
585        isa = Address.toSocketAddress(addr);
586        if (isa.isUnresolved()) {
587          if (this.rpcClient.metrics != null) {
588            this.rpcClient.metrics.incrNsLookupsFailed();
589          }
590          isa = null;
591          throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
592        }
593      }
594      return rpcClient.callBlockingMethod(md, configureRpcController(controller),
595        param, returnType, ticket, isa);
596    }
597  }
598
599  /**
600   * Async rpc channel that goes via hbase rpc.
601   */
602  public static class RpcChannelImplementation extends AbstractRpcChannel implements
603      RpcChannel {
604
605    protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr,
606        User ticket, int rpcTimeout) {
607      super(rpcClient, addr, ticket, rpcTimeout);
608    }
609
610    @Override
611    public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
612        Message param, Message returnType, RpcCallback<Message> done) {
613      HBaseRpcController configuredController =
614        configureRpcController(Preconditions.checkNotNull(controller,
615          "RpcController can not be null for async rpc call"));
616      // Look up remote address upon first call
617      if (isa == null || isa.isUnresolved()) {
618        if (this.rpcClient.metrics != null) {
619          this.rpcClient.metrics.incrNsLookups();
620        }
621        isa = Address.toSocketAddress(addr);
622        if (isa.isUnresolved()) {
623          if (this.rpcClient.metrics != null) {
624            this.rpcClient.metrics.incrNsLookupsFailed();
625          }
626          isa = null;
627          controller.setFailed(addr + " could not be resolved");
628          return;
629        }
630      }
631      // This method does not throw any exceptions, so the caller must provide a
632      // HBaseRpcController which is used to pass the exceptions.
633      this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
634    }
635  }
636}