001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
024import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
026import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsentEx;
027
028import edu.umd.cs.findbugs.annotations.Nullable;
029import java.io.Closeable;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.lang.reflect.UndeclaredThrowableException;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.Date;
036import java.util.List;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.LinkedBlockingQueue;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.locks.ReentrantLock;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CallQueueTooBigException;
050import org.apache.hadoop.hbase.DoNotRetryIOException;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.MasterNotRunningException;
054import org.apache.hadoop.hbase.MetaTableAccessor;
055import org.apache.hadoop.hbase.RegionLocations;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.TableNotEnabledException;
059import org.apache.hadoop.hbase.TableNotFoundException;
060import org.apache.hadoop.hbase.ZooKeeperConnectionException;
061import org.apache.hadoop.hbase.client.Scan.ReadType;
062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
063import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
064import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
065import org.apache.hadoop.hbase.exceptions.RegionMovedException;
066import org.apache.hadoop.hbase.ipc.RpcClient;
067import org.apache.hadoop.hbase.ipc.RpcClientFactory;
068import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
069import org.apache.hadoop.hbase.log.HBaseMarkers;
070import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
071import org.apache.hadoop.hbase.security.User;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.ExceptionUtil;
075import org.apache.hadoop.hbase.util.Pair;
076import org.apache.hadoop.hbase.util.ReflectionUtils;
077import org.apache.hadoop.hbase.util.Threads;
078import org.apache.hadoop.ipc.RemoteException;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.apache.zookeeper.KeeperException;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
085import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
086import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
087import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
088import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
089import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
090import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
129
130/**
131 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
132 * Encapsulates connection to zookeeper and regionservers.
133 */
134@edu.umd.cs.findbugs.annotations.SuppressWarnings(
135    value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
136    justification="Access to the conncurrent hash map is under a lock so should be fine.")
137@InterfaceAudience.Private
138class ConnectionImplementation implements ClusterConnection, Closeable {
139  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
140  private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
141
142  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
143
144  private final boolean hostnamesCanChange;
145  private final long pause;
146  private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
147  private boolean useMetaReplicas;
148  private final int metaReplicaCallTimeoutScanInMicroSecond;
149  private final int numTries;
150  final int rpcTimeout;
151
152  /**
153   * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
154   * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
155   */
156  private static volatile NonceGenerator nonceGenerator = null;
157  /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */
158  private static final Object nonceGeneratorCreateLock = new Object();
159
160  private final AsyncProcess asyncProcess;
161  // single tracker per connection
162  private final ServerStatisticTracker stats;
163
164  private volatile boolean closed;
165  private volatile boolean aborted;
166
167  // package protected for the tests
168  ClusterStatusListener clusterStatusListener;
169
170  private final Object metaRegionLock = new Object();
171
172  private final Object masterLock = new Object();
173
174  // thread executor shared by all Table instances created
175  // by this connection
176  private volatile ExecutorService batchPool = null;
177  // meta thread executor shared by all Table instances created
178  // by this connection
179  private volatile ExecutorService metaLookupPool = null;
180  private volatile boolean cleanupPool = false;
181
182  private final Configuration conf;
183
184  // cache the configuration value for tables so that we can avoid calling
185  // the expensive Configuration to fetch the value multiple times.
186  private final ConnectionConfiguration connectionConfig;
187
188  // Client rpc instance.
189  private final RpcClient rpcClient;
190
191  private final MetaCache metaCache;
192  private final MetricsConnection metrics;
193
194  protected User user;
195
196  private final RpcRetryingCallerFactory rpcCallerFactory;
197
198  private final RpcControllerFactory rpcControllerFactory;
199
200  private final RetryingCallerInterceptor interceptor;
201
202  /**
203   * Cluster registry of basic info such as clusterid and meta region location.
204   */
205  private final AsyncRegistry registry;
206
207  private final ClientBackoffPolicy backoffPolicy;
208
209  /**
210   * Allow setting an alternate BufferedMutator implementation via
211   * config. If null, use default.
212   */
213  private final String alternateBufferedMutatorClassName;
214
215  /** lock guards against multiple threads trying to query the meta region at the same time */
216  private final ReentrantLock userRegionLock = new ReentrantLock();
217
218  /**
219   * constructor
220   * @param conf Configuration object
221   */
222  ConnectionImplementation(Configuration conf,
223                           ExecutorService pool, User user) throws IOException {
224    this.conf = conf;
225    this.user = user;
226    this.batchPool = pool;
227    this.connectionConfig = new ConnectionConfiguration(conf);
228    this.closed = false;
229    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
230        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
231    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
232    if (configuredPauseForCQTBE < pause) {
233      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
234          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
235          + ", will use " + pause + " instead.");
236      this.pauseForCQTBE = pause;
237    } else {
238      this.pauseForCQTBE = configuredPauseForCQTBE;
239    }
240    this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
241      HConstants.DEFAULT_USE_META_REPLICAS);
242    this.metaReplicaCallTimeoutScanInMicroSecond =
243        connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
244
245    // how many times to try, one more than max *retry* time
246    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
247    this.rpcTimeout = conf.getInt(
248        HConstants.HBASE_RPC_TIMEOUT_KEY,
249        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
250    if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
251      synchronized (nonceGeneratorCreateLock) {
252        if (nonceGenerator == null) {
253          nonceGenerator = PerClientRandomNonceGenerator.get();
254        }
255      }
256    } else {
257      nonceGenerator = NO_NONCE_GENERATOR;
258    }
259
260    this.stats = ServerStatisticTracker.create(conf);
261    this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
262    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
263    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
264    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
265    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
266    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
267      this.metrics = new MetricsConnection(this);
268    } else {
269      this.metrics = null;
270    }
271    this.metaCache = new MetaCache(this.metrics);
272
273    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
274        HConstants.STATUS_PUBLISHED_DEFAULT);
275    this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
276    Class<? extends ClusterStatusListener.Listener> listenerClass =
277        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
278            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
279            ClusterStatusListener.Listener.class);
280
281    // Is there an alternate BufferedMutator to use?
282    this.alternateBufferedMutatorClassName =
283        this.conf.get(BufferedMutator.CLASSNAME_KEY);
284
285    try {
286      this.registry = AsyncRegistryFactory.getRegistry(conf);
287      retrieveClusterId();
288
289      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
290
291      // Do we publish the status?
292      if (shouldListen) {
293        if (listenerClass == null) {
294          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
295              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
296        } else {
297          clusterStatusListener = new ClusterStatusListener(
298              new ClusterStatusListener.DeadServerHandler() {
299                @Override
300                public void newDead(ServerName sn) {
301                  clearCaches(sn);
302                  rpcClient.cancelConnections(sn);
303                }
304              }, conf, listenerClass);
305        }
306      }
307    } catch (Throwable e) {
308      // avoid leaks: registry, rpcClient, ...
309      LOG.debug("connection construction failed", e);
310      close();
311      throw e;
312    }
313  }
314
315  /**
316   * @param useMetaReplicas
317   */
318  @VisibleForTesting
319  void setUseMetaReplicas(final boolean useMetaReplicas) {
320    this.useMetaReplicas = useMetaReplicas;
321  }
322
323  /**
324   * @param conn The connection for which to replace the generator.
325   * @param cnm Replaces the nonce generator used, for testing.
326   * @return old nonce generator.
327   */
328  @VisibleForTesting
329  static NonceGenerator injectNonceGeneratorForTesting(
330      ClusterConnection conn, NonceGenerator cnm) {
331    ConnectionImplementation connImpl = (ConnectionImplementation)conn;
332    NonceGenerator ng = connImpl.getNonceGenerator();
333    LOG.warn("Nonce generator is being replaced by test code for "
334      + cnm.getClass().getName());
335    nonceGenerator = cnm;
336    return ng;
337  }
338
339  @Override
340  public Table getTable(TableName tableName) throws IOException {
341    return getTable(tableName, getBatchPool());
342  }
343
344  @Override
345  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
346    return new TableBuilderBase(tableName, connectionConfig) {
347
348      @Override
349      public Table build() {
350        return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
351            rpcControllerFactory, pool);
352      }
353    };
354  }
355
356  @Override
357  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
358    if (params.getTableName() == null) {
359      throw new IllegalArgumentException("TableName cannot be null.");
360    }
361    if (params.getPool() == null) {
362      params.pool(HTable.getDefaultExecutor(getConfiguration()));
363    }
364    if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
365      params.writeBufferSize(connectionConfig.getWriteBufferSize());
366    }
367    if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
368      params.setWriteBufferPeriodicFlushTimeoutMs(
369              connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
370    }
371    if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
372      params.setWriteBufferPeriodicFlushTimerTickMs(
373              connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
374    }
375    if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
376      params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
377    }
378    // Look to see if an alternate BufferedMutation implementation is wanted.
379    // Look in params and in config. If null, use default.
380    String implementationClassName = params.getImplementationClassName();
381    if (implementationClassName == null) {
382      implementationClassName = this.alternateBufferedMutatorClassName;
383    }
384    if (implementationClassName == null) {
385      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
386    }
387    try {
388      return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
389          this, rpcCallerFactory, rpcControllerFactory, params);
390    } catch (ClassNotFoundException e) {
391      throw new RuntimeException(e);
392    }
393  }
394
395  @Override
396  public BufferedMutator getBufferedMutator(TableName tableName) {
397    return getBufferedMutator(new BufferedMutatorParams(tableName));
398  }
399
400  @Override
401  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
402    return new HRegionLocator(tableName, this);
403  }
404
405  @Override
406  public Admin getAdmin() throws IOException {
407    return new HBaseAdmin(this);
408  }
409
410  @Override
411  public Hbck getHbck() throws IOException {
412    return getHbck(get(registry.getMasterAddress()));
413  }
414
415  @Override
416  public Hbck getHbck(ServerName masterServer) throws IOException {
417    checkClosed();
418    if (isDeadServer(masterServer)) {
419      throw new RegionServerStoppedException(masterServer + " is dead.");
420    }
421    String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
422      masterServer, this.hostnamesCanChange);
423
424    return new HBaseHbck(
425      (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
426        BlockingRpcChannel channel =
427          this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
428        return MasterProtos.HbckService.newBlockingStub(channel);
429      }), rpcControllerFactory);
430  }
431
432  @Override
433  public MetricsConnection getConnectionMetrics() {
434    return this.metrics;
435  }
436
437  private ExecutorService getBatchPool() {
438    if (batchPool == null) {
439      synchronized (this) {
440        if (batchPool == null) {
441          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
442          this.batchPool = getThreadPool(threads, threads, "-shared", null);
443          this.cleanupPool = true;
444        }
445      }
446    }
447    return this.batchPool;
448  }
449
450  private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
451      BlockingQueue<Runnable> passedWorkQueue) {
452    // shared HTable thread executor not yet initialized
453    if (maxThreads == 0) {
454      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
455    }
456    if (coreThreads == 0) {
457      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
458    }
459    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
460    BlockingQueue<Runnable> workQueue = passedWorkQueue;
461    if (workQueue == null) {
462      workQueue =
463        new LinkedBlockingQueue<>(maxThreads *
464            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
465                HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
466      coreThreads = maxThreads;
467    }
468    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
469        coreThreads,
470        maxThreads,
471        keepAliveTime,
472        TimeUnit.SECONDS,
473        workQueue,
474        Threads.newDaemonThreadFactory(toString() + nameHint));
475    tpe.allowCoreThreadTimeOut(true);
476    return tpe;
477  }
478
479  private ExecutorService getMetaLookupPool() {
480    if (this.metaLookupPool == null) {
481      synchronized (this) {
482        if (this.metaLookupPool == null) {
483          //Some of the threads would be used for meta replicas
484          //To start with, threads.max.core threads can hit the meta (including replicas).
485          //After that, requests will get queued up in the passed queue, and only after
486          //the queue is full, a new thread will be started
487          int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
488          this.metaLookupPool = getThreadPool(
489             threads,
490             threads,
491             "-metaLookup-shared-", new LinkedBlockingQueue<>());
492        }
493      }
494    }
495    return this.metaLookupPool;
496  }
497
498  protected ExecutorService getCurrentMetaLookupPool() {
499    return metaLookupPool;
500  }
501
502  protected ExecutorService getCurrentBatchPool() {
503    return batchPool;
504  }
505
506  private void shutdownPools() {
507    if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
508      shutdownBatchPool(this.batchPool);
509    }
510    if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
511      shutdownBatchPool(this.metaLookupPool);
512    }
513  }
514
515  private void shutdownBatchPool(ExecutorService pool) {
516    pool.shutdown();
517    try {
518      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
519        pool.shutdownNow();
520      }
521    } catch (InterruptedException e) {
522      pool.shutdownNow();
523    }
524  }
525
526  /**
527   * For tests only.
528   */
529  @VisibleForTesting
530  RpcClient getRpcClient() {
531    return rpcClient;
532  }
533
534  /**
535   * An identifier that will remain the same for a given connection.
536   */
537  @Override
538  public String toString(){
539    return "hconnection-0x" + Integer.toHexString(hashCode());
540  }
541
542  protected String clusterId = null;
543
544  protected void retrieveClusterId() {
545    if (clusterId != null) {
546      return;
547    }
548    try {
549      this.clusterId = this.registry.getClusterId().get();
550    } catch (InterruptedException | ExecutionException e) {
551      LOG.warn("Retrieve cluster id failed", e);
552    }
553    if (clusterId == null) {
554      clusterId = HConstants.CLUSTER_ID_DEFAULT;
555      LOG.debug("clusterid came back null, using default " + clusterId);
556    }
557  }
558
559  @Override
560  public Configuration getConfiguration() {
561    return this.conf;
562  }
563
564  private void checkClosed() throws DoNotRetryIOException {
565    if (this.closed) {
566      throw new DoNotRetryIOException(toString() + " closed");
567    }
568  }
569
570  /**
571   * @return true if the master is running, throws an exception otherwise
572   * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
573   * @deprecated this has been deprecated without a replacement
574   */
575  @Deprecated
576  @Override
577  public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
578    // When getting the master connection, we check it's running,
579    // so if there is no exception, it means we've been able to get a
580    // connection on a running master
581    MasterKeepAliveConnection m;
582    try {
583      m = getKeepAliveMasterService();
584    } catch (IOException e) {
585      throw new MasterNotRunningException(e);
586    }
587    m.close();
588    return true;
589  }
590
591  @Override
592  public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
593      boolean reload) throws IOException {
594    return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
595  }
596
597
598  @Override
599  public boolean isTableEnabled(TableName tableName) throws IOException {
600    return getTableState(tableName).inStates(TableState.State.ENABLED);
601  }
602
603  @Override
604  public boolean isTableDisabled(TableName tableName) throws IOException {
605    return getTableState(tableName).inStates(TableState.State.DISABLED);
606  }
607
608  @Override
609  public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
610      throws IOException {
611    checkClosed();
612    try {
613      if (!isTableEnabled(tableName)) {
614        LOG.debug("Table {} not enabled", tableName);
615        return false;
616      }
617      List<Pair<RegionInfo, ServerName>> locations =
618        MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true);
619
620      int notDeployed = 0;
621      int regionCount = 0;
622      for (Pair<RegionInfo, ServerName> pair : locations) {
623        RegionInfo info = pair.getFirst();
624        if (pair.getSecond() == null) {
625          LOG.debug("Table {} has not deployed region {}", tableName,
626              pair.getFirst().getEncodedName());
627          notDeployed++;
628        } else if (splitKeys != null
629            && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
630          for (byte[] splitKey : splitKeys) {
631            // Just check if the splitkey is available
632            if (Bytes.equals(info.getStartKey(), splitKey)) {
633              regionCount++;
634              break;
635            }
636          }
637        } else {
638          // Always empty start row should be counted
639          regionCount++;
640        }
641      }
642      if (notDeployed > 0) {
643        if (LOG.isDebugEnabled()) {
644          LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed);
645        }
646        return false;
647      } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
648        if (LOG.isDebugEnabled()) {
649          LOG.debug("Table {} expected to have {} regions, but only {} available", tableName,
650              splitKeys.length + 1, regionCount);
651        }
652        return false;
653      } else {
654        LOG.trace("Table {} should be available", tableName);
655        return true;
656      }
657    } catch (TableNotFoundException tnfe) {
658      LOG.warn("Table {} does not exist", tableName);
659      return false;
660    }
661  }
662
663  @Override
664  public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
665    RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
666      RegionInfo.getStartKey(regionName), false, true);
667    return locations == null ? null : locations.getRegionLocation();
668  }
669
670  private boolean isDeadServer(ServerName sn) {
671    if (clusterStatusListener == null) {
672      return false;
673    } else {
674      return clusterStatusListener.isDeadServer(sn);
675    }
676  }
677
678  @Override
679  public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
680    return locateRegions(tableName, false, true);
681  }
682
683  @Override
684  public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
685      boolean offlined) throws IOException {
686    List<RegionInfo> regions;
687    if (TableName.isMetaTableName(tableName)) {
688      regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO);
689    } else {
690      regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined);
691    }
692    List<HRegionLocation> locations = new ArrayList<>();
693    for (RegionInfo regionInfo : regions) {
694      if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) {
695        continue;
696      }
697      RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
698      if (list != null) {
699        for (HRegionLocation loc : list.getRegionLocations()) {
700          if (loc != null) {
701            locations.add(loc);
702          }
703        }
704      }
705    }
706    return locations;
707  }
708
709  @Override
710  public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
711      throws IOException {
712    RegionLocations locations = locateRegion(tableName, row, true, true);
713    return locations == null ? null : locations.getRegionLocation();
714  }
715
716  @Override
717  public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
718      throws IOException {
719    RegionLocations locations =
720      relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
721    return locations == null ? null
722      : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
723  }
724
725  @Override
726  public RegionLocations relocateRegion(final TableName tableName,
727      final byte [] row, int replicaId) throws IOException{
728    // Since this is an explicit request not to use any caching, finding
729    // disabled tables should not be desirable.  This will ensure that an exception is thrown when
730    // the first time a disabled table is interacted with.
731    if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
732      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
733    }
734
735    return locateRegion(tableName, row, false, true, replicaId);
736  }
737
738  @Override
739  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
740      boolean retry) throws IOException {
741    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
742  }
743
744  @Override
745  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
746      boolean retry, int replicaId) throws IOException {
747    checkClosed();
748    if (tableName == null || tableName.getName().length == 0) {
749      throw new IllegalArgumentException("table name cannot be null or zero length");
750    }
751    if (tableName.equals(TableName.META_TABLE_NAME)) {
752      return locateMeta(tableName, useCache, replicaId);
753    } else {
754      // Region not in the cache - have to go to the meta RS
755      return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
756    }
757  }
758
759  private RegionLocations locateMeta(final TableName tableName,
760      boolean useCache, int replicaId) throws IOException {
761    // HBASE-10785: We cache the location of the META itself, so that we are not overloading
762    // zookeeper with one request for every region lookup. We cache the META with empty row
763    // key in MetaCache.
764    byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
765    RegionLocations locations = null;
766    if (useCache) {
767      locations = getCachedLocation(tableName, metaCacheKey);
768      if (locations != null && locations.getRegionLocation(replicaId) != null) {
769        return locations;
770      }
771    }
772
773    // only one thread should do the lookup.
774    synchronized (metaRegionLock) {
775      // Check the cache again for a hit in case some other thread made the
776      // same query while we were waiting on the lock.
777      if (useCache) {
778        locations = getCachedLocation(tableName, metaCacheKey);
779        if (locations != null && locations.getRegionLocation(replicaId) != null) {
780          return locations;
781        }
782      }
783
784      // Look up from zookeeper
785      locations = get(this.registry.getMetaRegionLocation());
786      if (locations != null) {
787        cacheLocation(tableName, locations);
788      }
789    }
790    return locations;
791  }
792
793  /**
794   * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're
795   * seeking.
796   */
797  private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache,
798      boolean retry, int replicaId) throws IOException {
799    // If we are supposed to be using the cache, look in the cache to see if we already have the
800    // region.
801    if (useCache) {
802      RegionLocations locations = getCachedLocation(tableName, row);
803      if (locations != null && locations.getRegionLocation(replicaId) != null) {
804        return locations;
805      }
806    }
807    // build the key of the meta region we should be looking for.
808    // the extra 9's on the end are necessary to allow "exact" matches
809    // without knowing the precise region names.
810    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
811    byte[] metaStopKey =
812      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
813    Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
814      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
815      .setReadType(ReadType.PREAD);
816    if (this.useMetaReplicas) {
817      s.setConsistency(Consistency.TIMELINE);
818    }
819    int maxAttempts = (retry ? numTries : 1);
820    for (int tries = 0; ; tries++) {
821      if (tries >= maxAttempts) {
822        throw new NoServerForRegionException("Unable to find region for "
823            + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
824      }
825      if (useCache) {
826        RegionLocations locations = getCachedLocation(tableName, row);
827        if (locations != null && locations.getRegionLocation(replicaId) != null) {
828          return locations;
829        }
830      } else {
831        // If we are not supposed to be using the cache, delete any existing cached location
832        // so it won't interfere.
833        // We are only supposed to clean the cache for the specific replicaId
834        metaCache.clearCache(tableName, row, replicaId);
835      }
836      // Query the meta region
837      long pauseBase = this.pause;
838      userRegionLock.lock();
839      try {
840        if (useCache) {// re-check cache after get lock
841          RegionLocations locations = getCachedLocation(tableName, row);
842          if (locations != null && locations.getRegionLocation(replicaId) != null) {
843            return locations;
844          }
845        }
846        s.resetMvccReadPoint();
847        try (ReversedClientScanner rcs =
848          new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
849            rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
850          boolean tableNotFound = true;
851          for (;;) {
852            Result regionInfoRow = rcs.next();
853            if (regionInfoRow == null) {
854              if (tableNotFound) {
855                throw new TableNotFoundException(tableName);
856              } else {
857                throw new IOException(
858                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
859              }
860            }
861            tableNotFound = false;
862            // convert the row result into the HRegionLocation we need!
863            RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
864            if (locations == null || locations.getRegionLocation(replicaId) == null) {
865              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
866            }
867            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
868            if (regionInfo == null) {
869              throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME +
870                ", row=" + regionInfoRow);
871            }
872            // See HBASE-20182. It is possible that we locate to a split parent even after the
873            // children are online, so here we need to skip this region and go to the next one.
874            if (regionInfo.isSplitParent()) {
875              continue;
876            }
877            if (regionInfo.isOffline()) {
878              throw new RegionOfflineException("Region offline; disable table call? " +
879                  regionInfo.getRegionNameAsString());
880            }
881            // It is possible that the split children have not been online yet and we have skipped
882            // the parent in the above condition, so we may have already reached a region which does
883            // not contains us.
884            if (!regionInfo.containsRow(row)) {
885              throw new IOException(
886                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
887            }
888            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
889            if (serverName == null) {
890              throw new NoServerForRegionException("No server address listed in " +
891                TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
892                " containing row " + Bytes.toStringBinary(row));
893            }
894            if (isDeadServer(serverName)) {
895              throw new RegionServerStoppedException(
896                "hbase:meta says the region " + regionInfo.getRegionNameAsString() +
897                  " is managed by the server " + serverName + ", but it is dead.");
898            }
899            // Instantiate the location
900            cacheLocation(tableName, locations);
901            return locations;
902          }
903        }
904      } catch (TableNotFoundException e) {
905        // if we got this error, probably means the table just plain doesn't
906        // exist. rethrow the error immediately. this should always be coming
907        // from the HTable constructor.
908        throw e;
909      } catch (IOException e) {
910        ExceptionUtil.rethrowIfInterrupt(e);
911        if (e instanceof RemoteException) {
912          e = ((RemoteException)e).unwrapRemoteException();
913        }
914        if (e instanceof CallQueueTooBigException) {
915          // Give a special check on CallQueueTooBigException, see #HBASE-17114
916          pauseBase = this.pauseForCQTBE;
917        }
918        if (tries < maxAttempts - 1) {
919          LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " +
920            "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
921        } else {
922          throw e;
923        }
924        // Only relocate the parent region if necessary
925        if(!(e instanceof RegionOfflineException ||
926            e instanceof NoServerForRegionException)) {
927          relocateRegion(TableName.META_TABLE_NAME, metaStartKey, replicaId);
928        }
929      } finally {
930        userRegionLock.unlock();
931      }
932      try{
933        Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
934      } catch (InterruptedException e) {
935        throw new InterruptedIOException("Giving up trying to location region in " +
936          "meta: thread is interrupted.");
937      }
938    }
939  }
940
941  /**
942   * Put a newly discovered HRegionLocation into the cache.
943   * @param tableName The table name.
944   * @param location the new location
945   */
946  @Override
947  public void cacheLocation(final TableName tableName, final RegionLocations location) {
948    metaCache.cacheLocation(tableName, location);
949  }
950
951  /**
952   * Search the cache for a location that fits our table and row key.
953   * Return null if no suitable region is located.
954   * @return Null or region location found in cache.
955   */
956  RegionLocations getCachedLocation(final TableName tableName,
957      final byte [] row) {
958    return metaCache.getCachedLocation(tableName, row);
959  }
960
961  public void clearRegionCache(final TableName tableName, byte[] row) {
962    metaCache.clearCache(tableName, row);
963  }
964
965  /*
966   * Delete all cached entries of a table that maps to a specific location.
967   */
968  @Override
969  public void clearCaches(final ServerName serverName) {
970    metaCache.clearCache(serverName);
971  }
972
973  @Override
974  public void clearRegionCache() {
975    metaCache.clearCache();
976  }
977
978  @Override
979  public void clearRegionCache(final TableName tableName) {
980    metaCache.clearCache(tableName);
981  }
982
983  /**
984   * Put a newly discovered HRegionLocation into the cache.
985   * @param tableName The table name.
986   * @param source the source of the new location, if it's not coming from meta
987   * @param location the new location
988   */
989  private void cacheLocation(final TableName tableName, final ServerName source,
990      final HRegionLocation location) {
991    metaCache.cacheLocation(tableName, source, location);
992  }
993
994  // Map keyed by service name + regionserver to service stub implementation
995  private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>();
996
997  /**
998   * State of the MasterService connection/setup.
999   */
1000  static class MasterServiceState {
1001    Connection connection;
1002
1003    MasterProtos.MasterService.BlockingInterface stub;
1004    int userCount;
1005
1006    MasterServiceState(final Connection connection) {
1007      super();
1008      this.connection = connection;
1009    }
1010
1011    @Override
1012    public String toString() {
1013      return "MasterService";
1014    }
1015
1016    Object getStub() {
1017      return this.stub;
1018    }
1019
1020    void clearStub() {
1021      this.stub = null;
1022    }
1023
1024    boolean isMasterRunning() throws IOException {
1025      MasterProtos.IsMasterRunningResponse response = null;
1026      try {
1027        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1028      } catch (Exception e) {
1029        throw ProtobufUtil.handleRemoteException(e);
1030      }
1031      return response != null? response.getIsMasterRunning(): false;
1032    }
1033  }
1034
1035  /**
1036   * The record of errors for servers.
1037   */
1038  static class ServerErrorTracker {
1039    // We need a concurrent map here, as we could have multiple threads updating it in parallel.
1040    private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>();
1041    private final long canRetryUntil;
1042    private final int maxTries;// max number to try
1043    private final long startTrackingTime;
1044
1045    /**
1046     * Constructor
1047     * @param timeout how long to wait before timeout, in unit of millisecond
1048     * @param maxTries how many times to try
1049     */
1050    public ServerErrorTracker(long timeout, int maxTries) {
1051      this.maxTries = maxTries;
1052      this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
1053      this.startTrackingTime = new Date().getTime();
1054    }
1055
1056    /**
1057     * We stop to retry when we have exhausted BOTH the number of tries and the time allocated.
1058     * @param numAttempt how many times we have tried by now
1059     */
1060    boolean canTryMore(int numAttempt) {
1061      // If there is a single try we must not take into account the time.
1062      return numAttempt < maxTries || (maxTries > 1 &&
1063          EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
1064    }
1065
1066    /**
1067     * Calculates the back-off time for a retrying request to a particular server.
1068     *
1069     * @param server    The server in question.
1070     * @param basePause The default hci pause.
1071     * @return The time to wait before sending next request.
1072     */
1073    long calculateBackoffTime(ServerName server, long basePause) {
1074      long result;
1075      ServerErrors errorStats = errorsByServer.get(server);
1076      if (errorStats != null) {
1077        result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1));
1078      } else {
1079        result = 0; // yes, if the server is not in our list we don't wait before retrying.
1080      }
1081      return result;
1082    }
1083
1084    /**
1085     * Reports that there was an error on the server to do whatever bean-counting necessary.
1086     * @param server The server in question.
1087     */
1088    void reportServerError(ServerName server) {
1089      computeIfAbsent(errorsByServer, server, ServerErrors::new).addError();
1090    }
1091
1092    long getStartTrackingTime() {
1093      return startTrackingTime;
1094    }
1095
1096    /**
1097     * The record of errors for a server.
1098     */
1099    private static class ServerErrors {
1100      private final AtomicInteger retries = new AtomicInteger(0);
1101
1102      public int getCount() {
1103        return retries.get();
1104      }
1105
1106      public void addError() {
1107        retries.incrementAndGet();
1108      }
1109    }
1110  }
1111
1112  /**
1113   * Class to make a MasterServiceStubMaker stub.
1114   */
1115  private final class MasterServiceStubMaker {
1116
1117    private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
1118        throws IOException {
1119      try {
1120        stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1121      } catch (ServiceException e) {
1122        throw ProtobufUtil.handleRemoteException(e);
1123      }
1124    }
1125
1126    /**
1127     * Create a stub. Try once only. It is not typed because there is no common type to protobuf
1128     * services nor their interfaces. Let the caller do appropriate casting.
1129     * @return A stub for master services.
1130     */
1131    private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
1132        throws IOException, KeeperException {
1133      ServerName sn = get(registry.getMasterAddress());
1134      if (sn == null) {
1135        String msg = "ZooKeeper available but no active master location found";
1136        LOG.info(msg);
1137        throw new MasterNotRunningException(msg);
1138      }
1139      if (isDeadServer(sn)) {
1140        throw new MasterNotRunningException(sn + " is dead.");
1141      }
1142      // Use the security info interface name as our stub key
1143      String key =
1144          getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange);
1145      MasterProtos.MasterService.BlockingInterface stub =
1146          (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1147            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1148            return MasterProtos.MasterService.newBlockingStub(channel);
1149          });
1150      isMasterRunning(stub);
1151      return stub;
1152    }
1153
1154    /**
1155     * Create a stub against the master. Retry if necessary.
1156     * @return A stub to do <code>intf</code> against the master
1157     * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
1158     */
1159    MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
1160      // The lock must be at the beginning to prevent multiple master creations
1161      // (and leaks) in a multithread context
1162      synchronized (masterLock) {
1163        Exception exceptionCaught = null;
1164        if (!closed) {
1165          try {
1166            return makeStubNoRetries();
1167          } catch (IOException e) {
1168            exceptionCaught = e;
1169          } catch (KeeperException e) {
1170            exceptionCaught = e;
1171          }
1172          throw new MasterNotRunningException(exceptionCaught);
1173        } else {
1174          throw new DoNotRetryIOException("Connection was closed while trying to get master");
1175        }
1176      }
1177    }
1178  }
1179
1180  @Override
1181  public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
1182    return getAdmin(get(registry.getMasterAddress()));
1183  }
1184
1185  @Override
1186  public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
1187      throws IOException {
1188    checkClosed();
1189    if (isDeadServer(serverName)) {
1190      throw new RegionServerStoppedException(serverName + " is dead.");
1191    }
1192    String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
1193      this.hostnamesCanChange);
1194    return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1195      BlockingRpcChannel channel =
1196          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1197      return AdminProtos.AdminService.newBlockingStub(channel);
1198    });
1199  }
1200
1201  @Override
1202  public BlockingInterface getClient(ServerName serverName) throws IOException {
1203    checkClosed();
1204    if (isDeadServer(serverName)) {
1205      throw new RegionServerStoppedException(serverName + " is dead.");
1206    }
1207    String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(),
1208      serverName, this.hostnamesCanChange);
1209    return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1210      BlockingRpcChannel channel =
1211          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1212      return ClientProtos.ClientService.newBlockingStub(channel);
1213    });
1214  }
1215
1216  final MasterServiceState masterServiceState = new MasterServiceState(this);
1217
1218  @Override
1219  public MasterKeepAliveConnection getMaster() throws IOException {
1220    return getKeepAliveMasterService();
1221  }
1222
1223  private void resetMasterServiceState(final MasterServiceState mss) {
1224    mss.userCount++;
1225  }
1226
1227  private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
1228    synchronized (masterLock) {
1229      if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1230        MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1231        this.masterServiceState.stub = stubMaker.makeStub();
1232      }
1233      resetMasterServiceState(this.masterServiceState);
1234    }
1235    // Ugly delegation just so we can add in a Close method.
1236    final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
1237    return new MasterKeepAliveConnection() {
1238      MasterServiceState mss = masterServiceState;
1239
1240      @Override
1241      public MasterProtos.AbortProcedureResponse abortProcedure(
1242          RpcController controller,
1243          MasterProtos.AbortProcedureRequest request) throws ServiceException {
1244        return stub.abortProcedure(controller, request);
1245      }
1246
1247      @Override
1248      public MasterProtos.GetProceduresResponse getProcedures(
1249          RpcController controller,
1250          MasterProtos.GetProceduresRequest request) throws ServiceException {
1251        return stub.getProcedures(controller, request);
1252      }
1253
1254      @Override
1255      public MasterProtos.GetLocksResponse getLocks(
1256          RpcController controller,
1257          MasterProtos.GetLocksRequest request) throws ServiceException {
1258        return stub.getLocks(controller, request);
1259      }
1260
1261      @Override
1262      public MasterProtos.AddColumnResponse addColumn(
1263          RpcController controller,
1264          MasterProtos.AddColumnRequest request) throws ServiceException {
1265        return stub.addColumn(controller, request);
1266      }
1267
1268      @Override
1269      public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller,
1270          MasterProtos.DeleteColumnRequest request)
1271      throws ServiceException {
1272        return stub.deleteColumn(controller, request);
1273      }
1274
1275      @Override
1276      public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller,
1277          MasterProtos.ModifyColumnRequest request)
1278      throws ServiceException {
1279        return stub.modifyColumn(controller, request);
1280      }
1281
1282      @Override
1283      public MasterProtos.MoveRegionResponse moveRegion(RpcController controller,
1284          MasterProtos.MoveRegionRequest request) throws ServiceException {
1285        return stub.moveRegion(controller, request);
1286      }
1287
1288      @Override
1289      public MasterProtos.MergeTableRegionsResponse mergeTableRegions(
1290          RpcController controller, MasterProtos.MergeTableRegionsRequest request)
1291          throws ServiceException {
1292        return stub.mergeTableRegions(controller, request);
1293      }
1294
1295      @Override
1296      public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
1297          MasterProtos.AssignRegionRequest request) throws ServiceException {
1298        return stub.assignRegion(controller, request);
1299      }
1300
1301      @Override
1302      public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller,
1303          MasterProtos.UnassignRegionRequest request) throws ServiceException {
1304        return stub.unassignRegion(controller, request);
1305      }
1306
1307      @Override
1308      public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller,
1309          MasterProtos.OfflineRegionRequest request) throws ServiceException {
1310        return stub.offlineRegion(controller, request);
1311      }
1312
1313      @Override
1314      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
1315          MasterProtos.SplitTableRegionRequest request) throws ServiceException {
1316        return stub.splitRegion(controller, request);
1317      }
1318
1319      @Override
1320      public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
1321          MasterProtos.DeleteTableRequest request) throws ServiceException {
1322        return stub.deleteTable(controller, request);
1323      }
1324
1325      @Override
1326      public MasterProtos.TruncateTableResponse truncateTable(RpcController controller,
1327          MasterProtos.TruncateTableRequest request) throws ServiceException {
1328        return stub.truncateTable(controller, request);
1329      }
1330
1331      @Override
1332      public MasterProtos.EnableTableResponse enableTable(RpcController controller,
1333          MasterProtos.EnableTableRequest request) throws ServiceException {
1334        return stub.enableTable(controller, request);
1335      }
1336
1337      @Override
1338      public MasterProtos.DisableTableResponse disableTable(RpcController controller,
1339          MasterProtos.DisableTableRequest request) throws ServiceException {
1340        return stub.disableTable(controller, request);
1341      }
1342
1343      @Override
1344      public MasterProtos.ModifyTableResponse modifyTable(RpcController controller,
1345          MasterProtos.ModifyTableRequest request) throws ServiceException {
1346        return stub.modifyTable(controller, request);
1347      }
1348
1349      @Override
1350      public MasterProtos.CreateTableResponse createTable(RpcController controller,
1351          MasterProtos.CreateTableRequest request) throws ServiceException {
1352        return stub.createTable(controller, request);
1353      }
1354
1355      @Override
1356      public MasterProtos.ShutdownResponse shutdown(RpcController controller,
1357          MasterProtos.ShutdownRequest request) throws ServiceException {
1358        return stub.shutdown(controller, request);
1359      }
1360
1361      @Override
1362      public MasterProtos.StopMasterResponse stopMaster(RpcController controller,
1363          MasterProtos.StopMasterRequest request) throws ServiceException {
1364        return stub.stopMaster(controller, request);
1365      }
1366
1367      @Override
1368      public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1369          final RpcController controller,
1370          final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException {
1371        return stub.isMasterInMaintenanceMode(controller, request);
1372      }
1373
1374      @Override
1375      public MasterProtos.BalanceResponse balance(RpcController controller,
1376          MasterProtos.BalanceRequest request) throws ServiceException {
1377        return stub.balance(controller, request);
1378      }
1379
1380      @Override
1381      public MasterProtos.SetBalancerRunningResponse setBalancerRunning(
1382          RpcController controller, MasterProtos.SetBalancerRunningRequest request)
1383          throws ServiceException {
1384        return stub.setBalancerRunning(controller, request);
1385      }
1386
1387      @Override
1388      public NormalizeResponse normalize(RpcController controller,
1389          NormalizeRequest request) throws ServiceException {
1390        return stub.normalize(controller, request);
1391      }
1392
1393      @Override
1394      public SetNormalizerRunningResponse setNormalizerRunning(
1395          RpcController controller, SetNormalizerRunningRequest request)
1396          throws ServiceException {
1397        return stub.setNormalizerRunning(controller, request);
1398      }
1399
1400      @Override
1401      public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller,
1402          MasterProtos.RunCatalogScanRequest request) throws ServiceException {
1403        return stub.runCatalogScan(controller, request);
1404      }
1405
1406      @Override
1407      public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor(
1408          RpcController controller, MasterProtos.EnableCatalogJanitorRequest request)
1409          throws ServiceException {
1410        return stub.enableCatalogJanitor(controller, request);
1411      }
1412
1413      @Override
1414      public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1415          RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request)
1416          throws ServiceException {
1417        return stub.isCatalogJanitorEnabled(controller, request);
1418      }
1419
1420      @Override
1421      public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller,
1422          MasterProtos.RunCleanerChoreRequest request)
1423          throws ServiceException {
1424        return stub.runCleanerChore(controller, request);
1425      }
1426
1427      @Override
1428      public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning(
1429          RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request)
1430          throws ServiceException {
1431        return stub.setCleanerChoreRunning(controller, request);
1432      }
1433
1434      @Override
1435      public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled(
1436          RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request)
1437          throws ServiceException {
1438        return stub.isCleanerChoreEnabled(controller, request);
1439      }
1440
1441      @Override
1442      public ClientProtos.CoprocessorServiceResponse execMasterService(
1443          RpcController controller, ClientProtos.CoprocessorServiceRequest request)
1444          throws ServiceException {
1445        return stub.execMasterService(controller, request);
1446      }
1447
1448      @Override
1449      public MasterProtos.SnapshotResponse snapshot(RpcController controller,
1450          MasterProtos.SnapshotRequest request) throws ServiceException {
1451        return stub.snapshot(controller, request);
1452      }
1453
1454      @Override
1455      public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots(
1456          RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request)
1457          throws ServiceException {
1458        return stub.getCompletedSnapshots(controller, request);
1459      }
1460
1461      @Override
1462      public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1463          MasterProtos.DeleteSnapshotRequest request) throws ServiceException {
1464        return stub.deleteSnapshot(controller, request);
1465      }
1466
1467      @Override
1468      public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1469          MasterProtos.IsSnapshotDoneRequest request) throws ServiceException {
1470        return stub.isSnapshotDone(controller, request);
1471      }
1472
1473      @Override
1474      public MasterProtos.RestoreSnapshotResponse restoreSnapshot(
1475          RpcController controller, MasterProtos.RestoreSnapshotRequest request)
1476          throws ServiceException {
1477        return stub.restoreSnapshot(controller, request);
1478      }
1479
1480      @Override
1481      public MasterProtos.ExecProcedureResponse execProcedure(
1482          RpcController controller, MasterProtos.ExecProcedureRequest request)
1483          throws ServiceException {
1484        return stub.execProcedure(controller, request);
1485      }
1486
1487      @Override
1488      public MasterProtos.ExecProcedureResponse execProcedureWithRet(
1489          RpcController controller, MasterProtos.ExecProcedureRequest request)
1490          throws ServiceException {
1491        return stub.execProcedureWithRet(controller, request);
1492      }
1493
1494      @Override
1495      public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller,
1496          MasterProtos.IsProcedureDoneRequest request) throws ServiceException {
1497        return stub.isProcedureDone(controller, request);
1498      }
1499
1500      @Override
1501      public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
1502          MasterProtos.GetProcedureResultRequest request) throws ServiceException {
1503        return stub.getProcedureResult(controller, request);
1504      }
1505
1506      @Override
1507      public MasterProtos.IsMasterRunningResponse isMasterRunning(
1508          RpcController controller, MasterProtos.IsMasterRunningRequest request)
1509          throws ServiceException {
1510        return stub.isMasterRunning(controller, request);
1511      }
1512
1513      @Override
1514      public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller,
1515          MasterProtos.ModifyNamespaceRequest request)
1516      throws ServiceException {
1517        return stub.modifyNamespace(controller, request);
1518      }
1519
1520      @Override
1521      public MasterProtos.CreateNamespaceResponse createNamespace(
1522          RpcController controller,
1523          MasterProtos.CreateNamespaceRequest request) throws ServiceException {
1524        return stub.createNamespace(controller, request);
1525      }
1526
1527      @Override
1528      public MasterProtos.DeleteNamespaceResponse deleteNamespace(
1529          RpcController controller,
1530          MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
1531        return stub.deleteNamespace(controller, request);
1532      }
1533
1534      @Override
1535      public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(
1536          RpcController controller,
1537          MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException {
1538        return stub.getNamespaceDescriptor(controller, request);
1539      }
1540
1541      @Override
1542      public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(
1543          RpcController controller,
1544          MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException {
1545        return stub.listNamespaceDescriptors(controller, request);
1546      }
1547
1548      @Override
1549      public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1550          RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request)
1551              throws ServiceException {
1552        return stub.listTableDescriptorsByNamespace(controller, request);
1553      }
1554
1555      @Override
1556      public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1557          RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request)
1558              throws ServiceException {
1559        return stub.listTableNamesByNamespace(controller, request);
1560      }
1561
1562      @Override
1563      public MasterProtos.GetTableStateResponse getTableState(
1564              RpcController controller, MasterProtos.GetTableStateRequest request)
1565              throws ServiceException {
1566        return stub.getTableState(controller, request);
1567      }
1568
1569      @Override
1570      public void close() {
1571        release(this.mss);
1572      }
1573
1574      @Override
1575      public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus(
1576          RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request)
1577          throws ServiceException {
1578        return stub.getSchemaAlterStatus(controller, request);
1579      }
1580
1581      @Override
1582      public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(
1583          RpcController controller, MasterProtos.GetTableDescriptorsRequest request)
1584          throws ServiceException {
1585        return stub.getTableDescriptors(controller, request);
1586      }
1587
1588      @Override
1589      public MasterProtos.GetTableNamesResponse getTableNames(
1590          RpcController controller, MasterProtos.GetTableNamesRequest request)
1591          throws ServiceException {
1592        return stub.getTableNames(controller, request);
1593      }
1594
1595      @Override
1596      public MasterProtos.GetClusterStatusResponse getClusterStatus(
1597          RpcController controller, MasterProtos.GetClusterStatusRequest request)
1598          throws ServiceException {
1599        return stub.getClusterStatus(controller, request);
1600      }
1601
1602      @Override
1603      public MasterProtos.SetQuotaResponse setQuota(
1604          RpcController controller, MasterProtos.SetQuotaRequest request)
1605          throws ServiceException {
1606        return stub.setQuota(controller, request);
1607      }
1608
1609      @Override
1610      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
1611          RpcController controller, MasterProtos.MajorCompactionTimestampRequest request)
1612          throws ServiceException {
1613        return stub.getLastMajorCompactionTimestamp(controller, request);
1614      }
1615
1616      @Override
1617      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
1618          RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request)
1619          throws ServiceException {
1620        return stub.getLastMajorCompactionTimestampForRegion(controller, request);
1621      }
1622
1623      @Override
1624      public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
1625          IsBalancerEnabledRequest request) throws ServiceException {
1626        return stub.isBalancerEnabled(controller, request);
1627      }
1628
1629      @Override
1630      public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
1631        RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request)
1632        throws ServiceException {
1633        return stub.setSplitOrMergeEnabled(controller, request);
1634      }
1635
1636      @Override
1637      public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
1638        RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request)
1639              throws ServiceException {
1640        return stub.isSplitOrMergeEnabled(controller, request);
1641      }
1642
1643      @Override
1644      public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
1645          IsNormalizerEnabledRequest request) throws ServiceException {
1646        return stub.isNormalizerEnabled(controller, request);
1647      }
1648
1649      @Override
1650      public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
1651          SecurityCapabilitiesRequest request) throws ServiceException {
1652        return stub.getSecurityCapabilities(controller, request);
1653      }
1654
1655      @Override
1656      public AddReplicationPeerResponse addReplicationPeer(RpcController controller,
1657          AddReplicationPeerRequest request) throws ServiceException {
1658        return stub.addReplicationPeer(controller, request);
1659      }
1660
1661      @Override
1662      public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
1663          RemoveReplicationPeerRequest request) throws ServiceException {
1664        return stub.removeReplicationPeer(controller, request);
1665      }
1666
1667      @Override
1668      public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
1669          EnableReplicationPeerRequest request) throws ServiceException {
1670        return stub.enableReplicationPeer(controller, request);
1671      }
1672
1673      @Override
1674      public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
1675          DisableReplicationPeerRequest request) throws ServiceException {
1676        return stub.disableReplicationPeer(controller, request);
1677      }
1678
1679      @Override
1680      public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
1681          ListDecommissionedRegionServersRequest request) throws ServiceException {
1682        return stub.listDecommissionedRegionServers(controller, request);
1683      }
1684
1685      @Override
1686      public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
1687          DecommissionRegionServersRequest request) throws ServiceException {
1688        return stub.decommissionRegionServers(controller, request);
1689      }
1690
1691      @Override
1692      public RecommissionRegionServerResponse recommissionRegionServer(
1693          RpcController controller, RecommissionRegionServerRequest request)
1694          throws ServiceException {
1695        return stub.recommissionRegionServer(controller, request);
1696      }
1697
1698      @Override
1699      public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
1700          GetReplicationPeerConfigRequest request) throws ServiceException {
1701        return stub.getReplicationPeerConfig(controller, request);
1702      }
1703
1704      @Override
1705      public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig(
1706          RpcController controller, UpdateReplicationPeerConfigRequest request)
1707          throws ServiceException {
1708        return stub.updateReplicationPeerConfig(controller, request);
1709      }
1710
1711      @Override
1712      public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
1713          ListReplicationPeersRequest request) throws ServiceException {
1714        return stub.listReplicationPeers(controller, request);
1715      }
1716
1717      @Override
1718      public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(
1719          RpcController controller, GetSpaceQuotaRegionSizesRequest request)
1720          throws ServiceException {
1721        return stub.getSpaceQuotaRegionSizes(controller, request);
1722      }
1723
1724      @Override
1725      public GetQuotaStatesResponse getQuotaStates(
1726          RpcController controller, GetQuotaStatesRequest request) throws ServiceException {
1727        return stub.getQuotaStates(controller, request);
1728      }
1729
1730      @Override
1731      public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller,
1732          MasterProtos.ClearDeadServersRequest request) throws ServiceException {
1733        return stub.clearDeadServers(controller, request);
1734      }
1735    };
1736  }
1737
1738  private static void release(MasterServiceState mss) {
1739    if (mss != null && mss.connection != null) {
1740      ((ConnectionImplementation)mss.connection).releaseMaster(mss);
1741    }
1742  }
1743
1744  private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
1745    if (mss.getStub() == null){
1746      return false;
1747    }
1748    try {
1749      return mss.isMasterRunning();
1750    } catch (UndeclaredThrowableException e) {
1751      // It's somehow messy, but we can receive exceptions such as
1752      //  java.net.ConnectException but they're not declared. So we catch it...
1753      LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
1754      return false;
1755    } catch (IOException se) {
1756      LOG.warn("Checking master connection", se);
1757      return false;
1758    }
1759  }
1760
1761  void releaseMaster(MasterServiceState mss) {
1762    if (mss.getStub() == null) {
1763      return;
1764    }
1765    synchronized (masterLock) {
1766      --mss.userCount;
1767    }
1768  }
1769
1770  private void closeMasterService(MasterServiceState mss) {
1771    if (mss.getStub() != null) {
1772      LOG.info("Closing master protocol: " + mss);
1773      mss.clearStub();
1774    }
1775    mss.userCount = 0;
1776  }
1777
1778  /**
1779   * Immediate close of the shared master. Can be by the delayed close or when closing the
1780   * connection itself.
1781   */
1782  private void closeMaster() {
1783    synchronized (masterLock) {
1784      closeMasterService(masterServiceState);
1785    }
1786  }
1787
1788  void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) {
1789    HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
1790    cacheLocation(hri.getTable(), source, newHrl);
1791  }
1792
1793  @Override
1794  public void deleteCachedRegionLocation(final HRegionLocation location) {
1795    metaCache.clearCache(location);
1796  }
1797
1798  /**
1799   * Update the location with the new value (if the exception is a RegionMovedException)
1800   * or delete it from the cache. Does nothing if we can be sure from the exception that
1801   * the location is still accurate, or if the cache has already been updated.
1802   * @param exception an object (to simplify user code) on which we will try to find a nested
1803   *   or wrapped or both RegionMovedException
1804   * @param source server that is the source of the location update.
1805   */
1806  @Override
1807  public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
1808    final Object exception, final ServerName source) {
1809    if (rowkey == null || tableName == null) {
1810      LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
1811          ", tableName=" + (tableName == null ? "null" : tableName));
1812      return;
1813    }
1814
1815    if (source == null) {
1816      // This should not happen, but let's secure ourselves.
1817      return;
1818    }
1819
1820    if (regionName == null) {
1821      // we do not know which region, so just remove the cache entry for the row and server
1822      if (metrics != null) {
1823        metrics.incrCacheDroppingExceptions(exception);
1824      }
1825      metaCache.clearCache(tableName, rowkey, source);
1826      return;
1827    }
1828
1829    // Is it something we have already updated?
1830    final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
1831    HRegionLocation oldLocation = null;
1832    if (oldLocations != null) {
1833      oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
1834    }
1835    if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
1836      // There is no such location in the cache (it's been removed already) or
1837      // the cache has already been refreshed with a different location.  => nothing to do
1838      return;
1839    }
1840
1841    RegionInfo regionInfo = oldLocation.getRegion();
1842    Throwable cause = ClientExceptionsUtil.findException(exception);
1843    if (cause != null) {
1844      if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
1845        // We know that the region is still on this region server
1846        return;
1847      }
1848
1849      if (cause instanceof RegionMovedException) {
1850        RegionMovedException rme = (RegionMovedException) cause;
1851        if (LOG.isTraceEnabled()) {
1852          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
1853              rme.getHostname() + ":" + rme.getPort() +
1854              " according to " + source.getAddress());
1855        }
1856        // We know that the region is not anymore on this region server, but we know
1857        //  the new location.
1858        updateCachedLocation(
1859            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
1860        return;
1861      }
1862    }
1863
1864    if (metrics != null) {
1865      metrics.incrCacheDroppingExceptions(exception);
1866    }
1867
1868    // If we're here, it means that can cannot be sure about the location, so we remove it from
1869    // the cache. Do not send the source because source can be a new server in the same host:port
1870    metaCache.clearCache(regionInfo);
1871  }
1872
1873  @Override
1874  public AsyncProcess getAsyncProcess() {
1875    return asyncProcess;
1876  }
1877
1878  @Override
1879  public ServerStatisticTracker getStatisticsTracker() {
1880    return this.stats;
1881  }
1882
1883  @Override
1884  public ClientBackoffPolicy getBackoffPolicy() {
1885    return this.backoffPolicy;
1886  }
1887
1888  /*
1889   * Return the number of cached region for a table. It will only be called
1890   * from a unit test.
1891   */
1892  @VisibleForTesting
1893  int getNumberOfCachedRegionLocations(final TableName tableName) {
1894    return metaCache.getNumberOfCachedRegionLocations(tableName);
1895  }
1896
1897  @Override
1898  public void abort(final String msg, Throwable t) {
1899    if (t != null) {
1900      LOG.error(HBaseMarkers.FATAL, msg, t);
1901    } else {
1902      LOG.error(HBaseMarkers.FATAL, msg);
1903    }
1904    this.aborted = true;
1905    close();
1906    this.closed = true;
1907  }
1908
1909  @Override
1910  public boolean isClosed() {
1911    return this.closed;
1912  }
1913
1914  @Override
1915  public boolean isAborted(){
1916    return this.aborted;
1917  }
1918
1919  @Override
1920  public int getCurrentNrHRS() throws IOException {
1921    return get(this.registry.getCurrentNrHRS());
1922  }
1923
1924  @Override
1925  public void close() {
1926    if (this.closed) {
1927      return;
1928    }
1929    closeMaster();
1930    shutdownPools();
1931    if (this.metrics != null) {
1932      this.metrics.shutdown();
1933    }
1934    this.closed = true;
1935    registry.close();
1936    this.stubs.clear();
1937    if (clusterStatusListener != null) {
1938      clusterStatusListener.close();
1939    }
1940    if (rpcClient != null) {
1941      rpcClient.close();
1942    }
1943  }
1944
1945  /**
1946   * Close the connection for good. On the off chance that someone is unable to close
1947   * the connection, perhaps because it bailed out prematurely, the method
1948   * below will ensure that this instance is cleaned up.
1949   * Caveat: The JVM may take an unknown amount of time to call finalize on an
1950   * unreachable object, so our hope is that every consumer cleans up after
1951   * itself, like any good citizen.
1952   */
1953  @Override
1954  protected void finalize() throws Throwable {
1955    super.finalize();
1956    close();
1957  }
1958
1959  @Override
1960  public NonceGenerator getNonceGenerator() {
1961    return nonceGenerator;
1962  }
1963
1964  @Override
1965  public TableState getTableState(TableName tableName) throws IOException {
1966    checkClosed();
1967    TableState tableState = MetaTableAccessor.getTableState(this, tableName);
1968    if (tableState == null) {
1969      throw new TableNotFoundException(tableName);
1970    }
1971    return tableState;
1972  }
1973
1974  @Override
1975  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
1976    return RpcRetryingCallerFactory
1977        .instantiate(conf, this.interceptor, this.getStatisticsTracker());
1978  }
1979
1980  @Override
1981  public boolean hasCellBlockSupport() {
1982    return this.rpcClient.hasCellBlockSupport();
1983  }
1984
1985  @Override
1986  public ConnectionConfiguration getConnectionConfiguration() {
1987    return this.connectionConfig;
1988  }
1989
1990  @Override
1991  public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
1992    return this.rpcCallerFactory;
1993  }
1994
1995  @Override
1996  public RpcControllerFactory getRpcControllerFactory() {
1997    return this.rpcControllerFactory;
1998  }
1999
2000  private static <T> T get(CompletableFuture<T> future) throws IOException {
2001    try {
2002      return future.get();
2003    } catch (InterruptedException e) {
2004      Thread.currentThread().interrupt();
2005      throw (IOException) new InterruptedIOException().initCause(e);
2006    } catch (ExecutionException e) {
2007      Throwable cause = e.getCause();
2008      Throwables.propagateIfPossible(cause, IOException.class);
2009      throw new IOException(cause);
2010    }
2011  }
2012}