001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
024import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
026import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsentEx;
027
028import edu.umd.cs.findbugs.annotations.Nullable;
029import java.io.Closeable;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.lang.reflect.UndeclaredThrowableException;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.Date;
036import java.util.List;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.LinkedBlockingQueue;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.locks.ReentrantLock;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CallQueueTooBigException;
050import org.apache.hadoop.hbase.DoNotRetryIOException;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.MasterNotRunningException;
054import org.apache.hadoop.hbase.MetaTableAccessor;
055import org.apache.hadoop.hbase.RegionLocations;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.TableNotEnabledException;
059import org.apache.hadoop.hbase.TableNotFoundException;
060import org.apache.hadoop.hbase.ZooKeeperConnectionException;
061import org.apache.hadoop.hbase.client.Scan.ReadType;
062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
063import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
064import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
065import org.apache.hadoop.hbase.exceptions.RegionMovedException;
066import org.apache.hadoop.hbase.ipc.RpcClient;
067import org.apache.hadoop.hbase.ipc.RpcClientFactory;
068import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
069import org.apache.hadoop.hbase.log.HBaseMarkers;
070import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
071import org.apache.hadoop.hbase.security.User;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.ExceptionUtil;
075import org.apache.hadoop.hbase.util.Pair;
076import org.apache.hadoop.hbase.util.ReflectionUtils;
077import org.apache.hadoop.hbase.util.Threads;
078import org.apache.hadoop.ipc.RemoteException;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.apache.zookeeper.KeeperException;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
085import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
086import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
087import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
088import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
089import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
090import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
129
130/**
131 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
132 * Encapsulates connection to zookeeper and regionservers.
133 */
134@edu.umd.cs.findbugs.annotations.SuppressWarnings(
135    value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
136    justification="Access to the conncurrent hash map is under a lock so should be fine.")
137@InterfaceAudience.Private
138class ConnectionImplementation implements ClusterConnection, Closeable {
139  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
140  private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
141
142  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
143
144  private final boolean hostnamesCanChange;
145  private final long pause;
146  private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
147  private boolean useMetaReplicas;
148  private final int metaReplicaCallTimeoutScanInMicroSecond;
149  private final int numTries;
150  final int rpcTimeout;
151
152  /**
153   * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
154   * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
155   */
156  private static volatile NonceGenerator nonceGenerator = null;
157  /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */
158  private static final Object nonceGeneratorCreateLock = new Object();
159
160  private final AsyncProcess asyncProcess;
161  // single tracker per connection
162  private final ServerStatisticTracker stats;
163
164  private volatile boolean closed;
165  private volatile boolean aborted;
166
167  // package protected for the tests
168  ClusterStatusListener clusterStatusListener;
169
170  private final Object metaRegionLock = new Object();
171
172  private final Object masterLock = new Object();
173
174  // thread executor shared by all Table instances created
175  // by this connection
176  private volatile ExecutorService batchPool = null;
177  // meta thread executor shared by all Table instances created
178  // by this connection
179  private volatile ExecutorService metaLookupPool = null;
180  private volatile boolean cleanupPool = false;
181
182  private final Configuration conf;
183
184  // cache the configuration value for tables so that we can avoid calling
185  // the expensive Configuration to fetch the value multiple times.
186  private final ConnectionConfiguration connectionConfig;
187
188  // Client rpc instance.
189  private final RpcClient rpcClient;
190
191  private final MetaCache metaCache;
192  private final MetricsConnection metrics;
193
194  protected User user;
195
196  private final RpcRetryingCallerFactory rpcCallerFactory;
197
198  private final RpcControllerFactory rpcControllerFactory;
199
200  private final RetryingCallerInterceptor interceptor;
201
202  /**
203   * Cluster registry of basic info such as clusterid and meta region location.
204   */
205  private final AsyncRegistry registry;
206
207  private final ClientBackoffPolicy backoffPolicy;
208
209  /**
210   * Allow setting an alternate BufferedMutator implementation via
211   * config. If null, use default.
212   */
213  private final String alternateBufferedMutatorClassName;
214
215  /** lock guards against multiple threads trying to query the meta region at the same time */
216  private final ReentrantLock userRegionLock = new ReentrantLock();
217
218  /**
219   * constructor
220   * @param conf Configuration object
221   */
222  ConnectionImplementation(Configuration conf,
223                           ExecutorService pool, User user) throws IOException {
224    this.conf = conf;
225    this.user = user;
226    this.batchPool = pool;
227    this.connectionConfig = new ConnectionConfiguration(conf);
228    this.closed = false;
229    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
230        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
231    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
232    if (configuredPauseForCQTBE < pause) {
233      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
234          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
235          + ", will use " + pause + " instead.");
236      this.pauseForCQTBE = pause;
237    } else {
238      this.pauseForCQTBE = configuredPauseForCQTBE;
239    }
240    this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
241      HConstants.DEFAULT_USE_META_REPLICAS);
242    this.metaReplicaCallTimeoutScanInMicroSecond =
243        connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
244
245    // how many times to try, one more than max *retry* time
246    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
247    this.rpcTimeout = conf.getInt(
248        HConstants.HBASE_RPC_TIMEOUT_KEY,
249        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
250    if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
251      synchronized (nonceGeneratorCreateLock) {
252        if (nonceGenerator == null) {
253          nonceGenerator = PerClientRandomNonceGenerator.get();
254        }
255      }
256    } else {
257      nonceGenerator = NO_NONCE_GENERATOR;
258    }
259
260    this.stats = ServerStatisticTracker.create(conf);
261    this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
262    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
263    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
264    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
265    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
266    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
267      this.metrics = new MetricsConnection(this);
268    } else {
269      this.metrics = null;
270    }
271    this.metaCache = new MetaCache(this.metrics);
272
273    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
274        HConstants.STATUS_PUBLISHED_DEFAULT);
275    this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
276    Class<? extends ClusterStatusListener.Listener> listenerClass =
277        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
278            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
279            ClusterStatusListener.Listener.class);
280
281    // Is there an alternate BufferedMutator to use?
282    this.alternateBufferedMutatorClassName =
283        this.conf.get(BufferedMutator.CLASSNAME_KEY);
284
285    try {
286      this.registry = AsyncRegistryFactory.getRegistry(conf);
287      retrieveClusterId();
288
289      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
290
291      // Do we publish the status?
292      if (shouldListen) {
293        if (listenerClass == null) {
294          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
295              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
296        } else {
297          clusterStatusListener = new ClusterStatusListener(
298              new ClusterStatusListener.DeadServerHandler() {
299                @Override
300                public void newDead(ServerName sn) {
301                  clearCaches(sn);
302                  rpcClient.cancelConnections(sn);
303                }
304              }, conf, listenerClass);
305        }
306      }
307    } catch (Throwable e) {
308      // avoid leaks: registry, rpcClient, ...
309      LOG.debug("connection construction failed", e);
310      close();
311      throw e;
312    }
313  }
314
315  /**
316   * @param useMetaReplicas
317   */
318  @VisibleForTesting
319  void setUseMetaReplicas(final boolean useMetaReplicas) {
320    this.useMetaReplicas = useMetaReplicas;
321  }
322
323  /**
324   * @param conn The connection for which to replace the generator.
325   * @param cnm Replaces the nonce generator used, for testing.
326   * @return old nonce generator.
327   */
328  @VisibleForTesting
329  static NonceGenerator injectNonceGeneratorForTesting(
330      ClusterConnection conn, NonceGenerator cnm) {
331    ConnectionImplementation connImpl = (ConnectionImplementation)conn;
332    NonceGenerator ng = connImpl.getNonceGenerator();
333    LOG.warn("Nonce generator is being replaced by test code for "
334      + cnm.getClass().getName());
335    nonceGenerator = cnm;
336    return ng;
337  }
338
339  @Override
340  public Table getTable(TableName tableName) throws IOException {
341    return getTable(tableName, getBatchPool());
342  }
343
344  @Override
345  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
346    return new TableBuilderBase(tableName, connectionConfig) {
347
348      @Override
349      public Table build() {
350        return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
351            rpcControllerFactory, pool);
352      }
353    };
354  }
355
356  @Override
357  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
358    if (params.getTableName() == null) {
359      throw new IllegalArgumentException("TableName cannot be null.");
360    }
361    if (params.getPool() == null) {
362      params.pool(HTable.getDefaultExecutor(getConfiguration()));
363    }
364    if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
365      params.writeBufferSize(connectionConfig.getWriteBufferSize());
366    }
367    if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
368      params.setWriteBufferPeriodicFlushTimeoutMs(
369              connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
370    }
371    if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
372      params.setWriteBufferPeriodicFlushTimerTickMs(
373              connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
374    }
375    if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
376      params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
377    }
378    // Look to see if an alternate BufferedMutation implementation is wanted.
379    // Look in params and in config. If null, use default.
380    String implementationClassName = params.getImplementationClassName();
381    if (implementationClassName == null) {
382      implementationClassName = this.alternateBufferedMutatorClassName;
383    }
384    if (implementationClassName == null) {
385      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
386    }
387    try {
388      return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
389          this, rpcCallerFactory, rpcControllerFactory, params);
390    } catch (ClassNotFoundException e) {
391      throw new RuntimeException(e);
392    }
393  }
394
395  @Override
396  public BufferedMutator getBufferedMutator(TableName tableName) {
397    return getBufferedMutator(new BufferedMutatorParams(tableName));
398  }
399
400  @Override
401  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
402    return new HRegionLocator(tableName, this);
403  }
404
405  @Override
406  public Admin getAdmin() throws IOException {
407    return new HBaseAdmin(this);
408  }
409
410  @Override
411  public Hbck getHbck() throws IOException {
412    return getHbck(get(registry.getMasterAddress()));
413  }
414
415  @Override
416  public Hbck getHbck(ServerName masterServer) throws IOException {
417    checkClosed();
418    if (isDeadServer(masterServer)) {
419      throw new RegionServerStoppedException(masterServer + " is dead.");
420    }
421    String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
422      masterServer, this.hostnamesCanChange);
423
424    return new HBaseHbck(
425      (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
426        BlockingRpcChannel channel =
427          this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
428        return MasterProtos.HbckService.newBlockingStub(channel);
429      }), rpcControllerFactory);
430  }
431
432  @Override
433  public MetricsConnection getConnectionMetrics() {
434    return this.metrics;
435  }
436
437  private ExecutorService getBatchPool() {
438    if (batchPool == null) {
439      synchronized (this) {
440        if (batchPool == null) {
441          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
442          this.batchPool = getThreadPool(threads, threads, "-shared", null);
443          this.cleanupPool = true;
444        }
445      }
446    }
447    return this.batchPool;
448  }
449
450  private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
451      BlockingQueue<Runnable> passedWorkQueue) {
452    // shared HTable thread executor not yet initialized
453    if (maxThreads == 0) {
454      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
455    }
456    if (coreThreads == 0) {
457      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
458    }
459    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
460    BlockingQueue<Runnable> workQueue = passedWorkQueue;
461    if (workQueue == null) {
462      workQueue =
463        new LinkedBlockingQueue<>(maxThreads *
464            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
465                HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
466      coreThreads = maxThreads;
467    }
468    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
469        coreThreads,
470        maxThreads,
471        keepAliveTime,
472        TimeUnit.SECONDS,
473        workQueue,
474        Threads.newDaemonThreadFactory(toString() + nameHint));
475    tpe.allowCoreThreadTimeOut(true);
476    return tpe;
477  }
478
479  private ExecutorService getMetaLookupPool() {
480    if (this.metaLookupPool == null) {
481      synchronized (this) {
482        if (this.metaLookupPool == null) {
483          //Some of the threads would be used for meta replicas
484          //To start with, threads.max.core threads can hit the meta (including replicas).
485          //After that, requests will get queued up in the passed queue, and only after
486          //the queue is full, a new thread will be started
487          int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
488          this.metaLookupPool = getThreadPool(
489             threads,
490             threads,
491             "-metaLookup-shared-", new LinkedBlockingQueue<>());
492        }
493      }
494    }
495    return this.metaLookupPool;
496  }
497
498  protected ExecutorService getCurrentMetaLookupPool() {
499    return metaLookupPool;
500  }
501
502  protected ExecutorService getCurrentBatchPool() {
503    return batchPool;
504  }
505
506  private void shutdownPools() {
507    if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
508      shutdownBatchPool(this.batchPool);
509    }
510    if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
511      shutdownBatchPool(this.metaLookupPool);
512    }
513  }
514
515  private void shutdownBatchPool(ExecutorService pool) {
516    pool.shutdown();
517    try {
518      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
519        pool.shutdownNow();
520      }
521    } catch (InterruptedException e) {
522      pool.shutdownNow();
523    }
524  }
525
526  /**
527   * For tests only.
528   */
529  @VisibleForTesting
530  RpcClient getRpcClient() {
531    return rpcClient;
532  }
533
534  /**
535   * An identifier that will remain the same for a given connection.
536   */
537  @Override
538  public String toString(){
539    return "hconnection-0x" + Integer.toHexString(hashCode());
540  }
541
542  protected String clusterId = null;
543
544  protected void retrieveClusterId() {
545    if (clusterId != null) {
546      return;
547    }
548    try {
549      this.clusterId = this.registry.getClusterId().get();
550    } catch (InterruptedException | ExecutionException e) {
551      LOG.warn("Retrieve cluster id failed", e);
552    }
553    if (clusterId == null) {
554      clusterId = HConstants.CLUSTER_ID_DEFAULT;
555      LOG.debug("clusterid came back null, using default " + clusterId);
556    }
557  }
558
559  @Override
560  public Configuration getConfiguration() {
561    return this.conf;
562  }
563
564  private void checkClosed() throws DoNotRetryIOException {
565    if (this.closed) {
566      throw new DoNotRetryIOException(toString() + " closed");
567    }
568  }
569
570  /**
571   * @return true if the master is running, throws an exception otherwise
572   * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
573   * @deprecated this has been deprecated without a replacement
574   */
575  @Deprecated
576  @Override
577  public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
578    // When getting the master connection, we check it's running,
579    // so if there is no exception, it means we've been able to get a
580    // connection on a running master
581    MasterKeepAliveConnection m;
582    try {
583      m = getKeepAliveMasterService();
584    } catch (IOException e) {
585      throw new MasterNotRunningException(e);
586    }
587    m.close();
588    return true;
589  }
590
591  @Override
592  public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
593      boolean reload) throws IOException {
594    return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
595  }
596
597
598  @Override
599  public boolean isTableEnabled(TableName tableName) throws IOException {
600    return getTableState(tableName).inStates(TableState.State.ENABLED);
601  }
602
603  @Override
604  public boolean isTableDisabled(TableName tableName) throws IOException {
605    return getTableState(tableName).inStates(TableState.State.DISABLED);
606  }
607
608  @Override
609  public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
610      throws IOException {
611    checkClosed();
612    try {
613      if (!isTableEnabled(tableName)) {
614        LOG.debug("Table {} not enabled", tableName);
615        return false;
616      }
617      List<Pair<RegionInfo, ServerName>> locations =
618        MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true);
619
620      int notDeployed = 0;
621      int regionCount = 0;
622      for (Pair<RegionInfo, ServerName> pair : locations) {
623        RegionInfo info = pair.getFirst();
624        if (pair.getSecond() == null) {
625          LOG.debug("Table {} has not deployed region {}", tableName,
626              pair.getFirst().getEncodedName());
627          notDeployed++;
628        } else if (splitKeys != null
629            && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
630          for (byte[] splitKey : splitKeys) {
631            // Just check if the splitkey is available
632            if (Bytes.equals(info.getStartKey(), splitKey)) {
633              regionCount++;
634              break;
635            }
636          }
637        } else {
638          // Always empty start row should be counted
639          regionCount++;
640        }
641      }
642      if (notDeployed > 0) {
643        if (LOG.isDebugEnabled()) {
644          LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed);
645        }
646        return false;
647      } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
648        if (LOG.isDebugEnabled()) {
649          LOG.debug("Table {} expected to have {} regions, but only {} available", tableName,
650              splitKeys.length + 1, regionCount);
651        }
652        return false;
653      } else {
654        LOG.trace("Table {} should be available", tableName);
655        return true;
656      }
657    } catch (TableNotFoundException tnfe) {
658      LOG.warn("Table {} does not exist", tableName);
659      return false;
660    }
661  }
662
663  @Override
664  public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
665    RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
666      RegionInfo.getStartKey(regionName), false, true);
667    return locations == null ? null : locations.getRegionLocation();
668  }
669
670  private boolean isDeadServer(ServerName sn) {
671    if (clusterStatusListener == null) {
672      return false;
673    } else {
674      return clusterStatusListener.isDeadServer(sn);
675    }
676  }
677
678  @Override
679  public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
680    return locateRegions(tableName, false, true);
681  }
682
683  @Override
684  public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
685      boolean offlined) throws IOException {
686    List<RegionInfo> regions;
687    if (TableName.isMetaTableName(tableName)) {
688      regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO);
689    } else {
690      regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined);
691    }
692    List<HRegionLocation> locations = new ArrayList<>();
693    for (RegionInfo regionInfo : regions) {
694      if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) {
695        continue;
696      }
697      RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
698      if (list != null) {
699        for (HRegionLocation loc : list.getRegionLocations()) {
700          if (loc != null) {
701            locations.add(loc);
702          }
703        }
704      }
705    }
706    return locations;
707  }
708
709  @Override
710  public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
711      throws IOException {
712    RegionLocations locations = locateRegion(tableName, row, true, true);
713    return locations == null ? null : locations.getRegionLocation();
714  }
715
716  @Override
717  public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
718      throws IOException {
719    RegionLocations locations =
720      relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
721    return locations == null ? null
722      : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
723  }
724
725  @Override
726  public RegionLocations relocateRegion(final TableName tableName,
727      final byte [] row, int replicaId) throws IOException{
728    // Since this is an explicit request not to use any caching, finding
729    // disabled tables should not be desirable.  This will ensure that an exception is thrown when
730    // the first time a disabled table is interacted with.
731    if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
732      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
733    }
734
735    return locateRegion(tableName, row, false, true, replicaId);
736  }
737
738  @Override
739  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
740      boolean retry) throws IOException {
741    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
742  }
743
744  @Override
745  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
746      boolean retry, int replicaId) throws IOException {
747    checkClosed();
748    if (tableName == null || tableName.getName().length == 0) {
749      throw new IllegalArgumentException("table name cannot be null or zero length");
750    }
751    if (tableName.equals(TableName.META_TABLE_NAME)) {
752      return locateMeta(tableName, useCache, replicaId);
753    } else {
754      // Region not in the cache - have to go to the meta RS
755      return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
756    }
757  }
758
759  private RegionLocations locateMeta(final TableName tableName,
760      boolean useCache, int replicaId) throws IOException {
761    // HBASE-10785: We cache the location of the META itself, so that we are not overloading
762    // zookeeper with one request for every region lookup. We cache the META with empty row
763    // key in MetaCache.
764    byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
765    RegionLocations locations = null;
766    if (useCache) {
767      locations = getCachedLocation(tableName, metaCacheKey);
768      if (locations != null && locations.getRegionLocation(replicaId) != null) {
769        return locations;
770      }
771    }
772
773    // only one thread should do the lookup.
774    synchronized (metaRegionLock) {
775      // Check the cache again for a hit in case some other thread made the
776      // same query while we were waiting on the lock.
777      if (useCache) {
778        locations = getCachedLocation(tableName, metaCacheKey);
779        if (locations != null && locations.getRegionLocation(replicaId) != null) {
780          return locations;
781        }
782      }
783
784      // Look up from zookeeper
785      locations = get(this.registry.getMetaRegionLocation());
786      if (locations != null) {
787        cacheLocation(tableName, locations);
788      }
789    }
790    return locations;
791  }
792
793  /**
794   * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're
795   * seeking.
796   */
797  private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache,
798      boolean retry, int replicaId) throws IOException {
799    // If we are supposed to be using the cache, look in the cache to see if we already have the
800    // region.
801    if (useCache) {
802      RegionLocations locations = getCachedLocation(tableName, row);
803      if (locations != null && locations.getRegionLocation(replicaId) != null) {
804        return locations;
805      }
806    }
807    // build the key of the meta region we should be looking for.
808    // the extra 9's on the end are necessary to allow "exact" matches
809    // without knowing the precise region names.
810    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
811    byte[] metaStopKey =
812      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
813    Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
814      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
815      .setReadType(ReadType.PREAD);
816    if (this.useMetaReplicas) {
817      s.setConsistency(Consistency.TIMELINE);
818    }
819    int maxAttempts = (retry ? numTries : 1);
820    boolean relocateMeta = false;
821    for (int tries = 0; ; tries++) {
822      if (tries >= maxAttempts) {
823        throw new NoServerForRegionException("Unable to find region for "
824            + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
825      }
826      if (useCache) {
827        RegionLocations locations = getCachedLocation(tableName, row);
828        if (locations != null && locations.getRegionLocation(replicaId) != null) {
829          return locations;
830        }
831      } else {
832        // If we are not supposed to be using the cache, delete any existing cached location
833        // so it won't interfere.
834        // We are only supposed to clean the cache for the specific replicaId
835        metaCache.clearCache(tableName, row, replicaId);
836      }
837      // Query the meta region
838      long pauseBase = this.pause;
839      userRegionLock.lock();
840      try {
841        if (useCache) {// re-check cache after get lock
842          RegionLocations locations = getCachedLocation(tableName, row);
843          if (locations != null && locations.getRegionLocation(replicaId) != null) {
844            return locations;
845          }
846        }
847        if (relocateMeta) {
848          relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
849            RegionInfo.DEFAULT_REPLICA_ID);
850        }
851        s.resetMvccReadPoint();
852        try (ReversedClientScanner rcs =
853          new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
854            rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
855          boolean tableNotFound = true;
856          for (;;) {
857            Result regionInfoRow = rcs.next();
858            if (regionInfoRow == null) {
859              if (tableNotFound) {
860                throw new TableNotFoundException(tableName);
861              } else {
862                throw new IOException(
863                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
864              }
865            }
866            tableNotFound = false;
867            // convert the row result into the HRegionLocation we need!
868            RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
869            if (locations == null || locations.getRegionLocation(replicaId) == null) {
870              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
871            }
872            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
873            if (regionInfo == null) {
874              throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME +
875                ", row=" + regionInfoRow);
876            }
877            // See HBASE-20182. It is possible that we locate to a split parent even after the
878            // children are online, so here we need to skip this region and go to the next one.
879            if (regionInfo.isSplitParent()) {
880              continue;
881            }
882            if (regionInfo.isOffline()) {
883              throw new RegionOfflineException("Region offline; disable table call? " +
884                  regionInfo.getRegionNameAsString());
885            }
886            // It is possible that the split children have not been online yet and we have skipped
887            // the parent in the above condition, so we may have already reached a region which does
888            // not contains us.
889            if (!regionInfo.containsRow(row)) {
890              throw new IOException(
891                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
892            }
893            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
894            if (serverName == null) {
895              throw new NoServerForRegionException("No server address listed in " +
896                TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
897                " containing row " + Bytes.toStringBinary(row));
898            }
899            if (isDeadServer(serverName)) {
900              throw new RegionServerStoppedException(
901                "hbase:meta says the region " + regionInfo.getRegionNameAsString() +
902                  " is managed by the server " + serverName + ", but it is dead.");
903            }
904            // Instantiate the location
905            cacheLocation(tableName, locations);
906            return locations;
907          }
908        }
909      } catch (TableNotFoundException e) {
910        // if we got this error, probably means the table just plain doesn't
911        // exist. rethrow the error immediately. this should always be coming
912        // from the HTable constructor.
913        throw e;
914      } catch (IOException e) {
915        ExceptionUtil.rethrowIfInterrupt(e);
916        if (e instanceof RemoteException) {
917          e = ((RemoteException)e).unwrapRemoteException();
918        }
919        if (e instanceof CallQueueTooBigException) {
920          // Give a special check on CallQueueTooBigException, see #HBASE-17114
921          pauseBase = this.pauseForCQTBE;
922        }
923        if (tries < maxAttempts - 1) {
924          LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " +
925            "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
926        } else {
927          throw e;
928        }
929        // Only relocate the parent region if necessary
930        relocateMeta =
931          !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
932      } finally {
933        userRegionLock.unlock();
934      }
935      try{
936        Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
937      } catch (InterruptedException e) {
938        throw new InterruptedIOException("Giving up trying to location region in " +
939          "meta: thread is interrupted.");
940      }
941    }
942  }
943
944  /**
945   * Put a newly discovered HRegionLocation into the cache.
946   * @param tableName The table name.
947   * @param location the new location
948   */
949  @Override
950  public void cacheLocation(final TableName tableName, final RegionLocations location) {
951    metaCache.cacheLocation(tableName, location);
952  }
953
954  /**
955   * Search the cache for a location that fits our table and row key.
956   * Return null if no suitable region is located.
957   * @return Null or region location found in cache.
958   */
959  RegionLocations getCachedLocation(final TableName tableName,
960      final byte [] row) {
961    return metaCache.getCachedLocation(tableName, row);
962  }
963
964  public void clearRegionCache(final TableName tableName, byte[] row) {
965    metaCache.clearCache(tableName, row);
966  }
967
968  /*
969   * Delete all cached entries of a table that maps to a specific location.
970   */
971  @Override
972  public void clearCaches(final ServerName serverName) {
973    metaCache.clearCache(serverName);
974  }
975
976  @Override
977  public void clearRegionCache() {
978    metaCache.clearCache();
979  }
980
981  @Override
982  public void clearRegionCache(final TableName tableName) {
983    metaCache.clearCache(tableName);
984  }
985
986  /**
987   * Put a newly discovered HRegionLocation into the cache.
988   * @param tableName The table name.
989   * @param source the source of the new location, if it's not coming from meta
990   * @param location the new location
991   */
992  private void cacheLocation(final TableName tableName, final ServerName source,
993      final HRegionLocation location) {
994    metaCache.cacheLocation(tableName, source, location);
995  }
996
997  // Map keyed by service name + regionserver to service stub implementation
998  private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>();
999
1000  /**
1001   * State of the MasterService connection/setup.
1002   */
1003  static class MasterServiceState {
1004    Connection connection;
1005
1006    MasterProtos.MasterService.BlockingInterface stub;
1007    int userCount;
1008
1009    MasterServiceState(final Connection connection) {
1010      super();
1011      this.connection = connection;
1012    }
1013
1014    @Override
1015    public String toString() {
1016      return "MasterService";
1017    }
1018
1019    Object getStub() {
1020      return this.stub;
1021    }
1022
1023    void clearStub() {
1024      this.stub = null;
1025    }
1026
1027    boolean isMasterRunning() throws IOException {
1028      MasterProtos.IsMasterRunningResponse response = null;
1029      try {
1030        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1031      } catch (Exception e) {
1032        throw ProtobufUtil.handleRemoteException(e);
1033      }
1034      return response != null? response.getIsMasterRunning(): false;
1035    }
1036  }
1037
1038  /**
1039   * The record of errors for servers.
1040   */
1041  static class ServerErrorTracker {
1042    // We need a concurrent map here, as we could have multiple threads updating it in parallel.
1043    private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>();
1044    private final long canRetryUntil;
1045    private final int maxTries;// max number to try
1046    private final long startTrackingTime;
1047
1048    /**
1049     * Constructor
1050     * @param timeout how long to wait before timeout, in unit of millisecond
1051     * @param maxTries how many times to try
1052     */
1053    public ServerErrorTracker(long timeout, int maxTries) {
1054      this.maxTries = maxTries;
1055      this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
1056      this.startTrackingTime = new Date().getTime();
1057    }
1058
1059    /**
1060     * We stop to retry when we have exhausted BOTH the number of tries and the time allocated.
1061     * @param numAttempt how many times we have tried by now
1062     */
1063    boolean canTryMore(int numAttempt) {
1064      // If there is a single try we must not take into account the time.
1065      return numAttempt < maxTries || (maxTries > 1 &&
1066          EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
1067    }
1068
1069    /**
1070     * Calculates the back-off time for a retrying request to a particular server.
1071     *
1072     * @param server    The server in question.
1073     * @param basePause The default hci pause.
1074     * @return The time to wait before sending next request.
1075     */
1076    long calculateBackoffTime(ServerName server, long basePause) {
1077      long result;
1078      ServerErrors errorStats = errorsByServer.get(server);
1079      if (errorStats != null) {
1080        result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1));
1081      } else {
1082        result = 0; // yes, if the server is not in our list we don't wait before retrying.
1083      }
1084      return result;
1085    }
1086
1087    /**
1088     * Reports that there was an error on the server to do whatever bean-counting necessary.
1089     * @param server The server in question.
1090     */
1091    void reportServerError(ServerName server) {
1092      computeIfAbsent(errorsByServer, server, ServerErrors::new).addError();
1093    }
1094
1095    long getStartTrackingTime() {
1096      return startTrackingTime;
1097    }
1098
1099    /**
1100     * The record of errors for a server.
1101     */
1102    private static class ServerErrors {
1103      private final AtomicInteger retries = new AtomicInteger(0);
1104
1105      public int getCount() {
1106        return retries.get();
1107      }
1108
1109      public void addError() {
1110        retries.incrementAndGet();
1111      }
1112    }
1113  }
1114
1115  /**
1116   * Class to make a MasterServiceStubMaker stub.
1117   */
1118  private final class MasterServiceStubMaker {
1119
1120    private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
1121        throws IOException {
1122      try {
1123        stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1124      } catch (ServiceException e) {
1125        throw ProtobufUtil.handleRemoteException(e);
1126      }
1127    }
1128
1129    /**
1130     * Create a stub. Try once only. It is not typed because there is no common type to protobuf
1131     * services nor their interfaces. Let the caller do appropriate casting.
1132     * @return A stub for master services.
1133     */
1134    private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
1135        throws IOException, KeeperException {
1136      ServerName sn = get(registry.getMasterAddress());
1137      if (sn == null) {
1138        String msg = "ZooKeeper available but no active master location found";
1139        LOG.info(msg);
1140        throw new MasterNotRunningException(msg);
1141      }
1142      if (isDeadServer(sn)) {
1143        throw new MasterNotRunningException(sn + " is dead.");
1144      }
1145      // Use the security info interface name as our stub key
1146      String key =
1147          getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange);
1148      MasterProtos.MasterService.BlockingInterface stub =
1149          (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1150            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1151            return MasterProtos.MasterService.newBlockingStub(channel);
1152          });
1153      isMasterRunning(stub);
1154      return stub;
1155    }
1156
1157    /**
1158     * Create a stub against the master. Retry if necessary.
1159     * @return A stub to do <code>intf</code> against the master
1160     * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
1161     */
1162    MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
1163      // The lock must be at the beginning to prevent multiple master creations
1164      // (and leaks) in a multithread context
1165      synchronized (masterLock) {
1166        Exception exceptionCaught = null;
1167        if (!closed) {
1168          try {
1169            return makeStubNoRetries();
1170          } catch (IOException e) {
1171            exceptionCaught = e;
1172          } catch (KeeperException e) {
1173            exceptionCaught = e;
1174          }
1175          throw new MasterNotRunningException(exceptionCaught);
1176        } else {
1177          throw new DoNotRetryIOException("Connection was closed while trying to get master");
1178        }
1179      }
1180    }
1181  }
1182
1183  @Override
1184  public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
1185    return getAdmin(get(registry.getMasterAddress()));
1186  }
1187
1188  @Override
1189  public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
1190      throws IOException {
1191    checkClosed();
1192    if (isDeadServer(serverName)) {
1193      throw new RegionServerStoppedException(serverName + " is dead.");
1194    }
1195    String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
1196      this.hostnamesCanChange);
1197    return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1198      BlockingRpcChannel channel =
1199          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1200      return AdminProtos.AdminService.newBlockingStub(channel);
1201    });
1202  }
1203
1204  @Override
1205  public BlockingInterface getClient(ServerName serverName) throws IOException {
1206    checkClosed();
1207    if (isDeadServer(serverName)) {
1208      throw new RegionServerStoppedException(serverName + " is dead.");
1209    }
1210    String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(),
1211      serverName, this.hostnamesCanChange);
1212    return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1213      BlockingRpcChannel channel =
1214          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1215      return ClientProtos.ClientService.newBlockingStub(channel);
1216    });
1217  }
1218
1219  final MasterServiceState masterServiceState = new MasterServiceState(this);
1220
1221  @Override
1222  public MasterKeepAliveConnection getMaster() throws IOException {
1223    return getKeepAliveMasterService();
1224  }
1225
1226  private void resetMasterServiceState(final MasterServiceState mss) {
1227    mss.userCount++;
1228  }
1229
1230  private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
1231    synchronized (masterLock) {
1232      if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1233        MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1234        this.masterServiceState.stub = stubMaker.makeStub();
1235      }
1236      resetMasterServiceState(this.masterServiceState);
1237    }
1238    // Ugly delegation just so we can add in a Close method.
1239    final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
1240    return new MasterKeepAliveConnection() {
1241      MasterServiceState mss = masterServiceState;
1242
1243      @Override
1244      public MasterProtos.AbortProcedureResponse abortProcedure(
1245          RpcController controller,
1246          MasterProtos.AbortProcedureRequest request) throws ServiceException {
1247        return stub.abortProcedure(controller, request);
1248      }
1249
1250      @Override
1251      public MasterProtos.GetProceduresResponse getProcedures(
1252          RpcController controller,
1253          MasterProtos.GetProceduresRequest request) throws ServiceException {
1254        return stub.getProcedures(controller, request);
1255      }
1256
1257      @Override
1258      public MasterProtos.GetLocksResponse getLocks(
1259          RpcController controller,
1260          MasterProtos.GetLocksRequest request) throws ServiceException {
1261        return stub.getLocks(controller, request);
1262      }
1263
1264      @Override
1265      public MasterProtos.AddColumnResponse addColumn(
1266          RpcController controller,
1267          MasterProtos.AddColumnRequest request) throws ServiceException {
1268        return stub.addColumn(controller, request);
1269      }
1270
1271      @Override
1272      public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller,
1273          MasterProtos.DeleteColumnRequest request)
1274      throws ServiceException {
1275        return stub.deleteColumn(controller, request);
1276      }
1277
1278      @Override
1279      public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller,
1280          MasterProtos.ModifyColumnRequest request)
1281      throws ServiceException {
1282        return stub.modifyColumn(controller, request);
1283      }
1284
1285      @Override
1286      public MasterProtos.MoveRegionResponse moveRegion(RpcController controller,
1287          MasterProtos.MoveRegionRequest request) throws ServiceException {
1288        return stub.moveRegion(controller, request);
1289      }
1290
1291      @Override
1292      public MasterProtos.MergeTableRegionsResponse mergeTableRegions(
1293          RpcController controller, MasterProtos.MergeTableRegionsRequest request)
1294          throws ServiceException {
1295        return stub.mergeTableRegions(controller, request);
1296      }
1297
1298      @Override
1299      public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
1300          MasterProtos.AssignRegionRequest request) throws ServiceException {
1301        return stub.assignRegion(controller, request);
1302      }
1303
1304      @Override
1305      public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller,
1306          MasterProtos.UnassignRegionRequest request) throws ServiceException {
1307        return stub.unassignRegion(controller, request);
1308      }
1309
1310      @Override
1311      public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller,
1312          MasterProtos.OfflineRegionRequest request) throws ServiceException {
1313        return stub.offlineRegion(controller, request);
1314      }
1315
1316      @Override
1317      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
1318          MasterProtos.SplitTableRegionRequest request) throws ServiceException {
1319        return stub.splitRegion(controller, request);
1320      }
1321
1322      @Override
1323      public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
1324          MasterProtos.DeleteTableRequest request) throws ServiceException {
1325        return stub.deleteTable(controller, request);
1326      }
1327
1328      @Override
1329      public MasterProtos.TruncateTableResponse truncateTable(RpcController controller,
1330          MasterProtos.TruncateTableRequest request) throws ServiceException {
1331        return stub.truncateTable(controller, request);
1332      }
1333
1334      @Override
1335      public MasterProtos.EnableTableResponse enableTable(RpcController controller,
1336          MasterProtos.EnableTableRequest request) throws ServiceException {
1337        return stub.enableTable(controller, request);
1338      }
1339
1340      @Override
1341      public MasterProtos.DisableTableResponse disableTable(RpcController controller,
1342          MasterProtos.DisableTableRequest request) throws ServiceException {
1343        return stub.disableTable(controller, request);
1344      }
1345
1346      @Override
1347      public MasterProtos.ModifyTableResponse modifyTable(RpcController controller,
1348          MasterProtos.ModifyTableRequest request) throws ServiceException {
1349        return stub.modifyTable(controller, request);
1350      }
1351
1352      @Override
1353      public MasterProtos.CreateTableResponse createTable(RpcController controller,
1354          MasterProtos.CreateTableRequest request) throws ServiceException {
1355        return stub.createTable(controller, request);
1356      }
1357
1358      @Override
1359      public MasterProtos.ShutdownResponse shutdown(RpcController controller,
1360          MasterProtos.ShutdownRequest request) throws ServiceException {
1361        return stub.shutdown(controller, request);
1362      }
1363
1364      @Override
1365      public MasterProtos.StopMasterResponse stopMaster(RpcController controller,
1366          MasterProtos.StopMasterRequest request) throws ServiceException {
1367        return stub.stopMaster(controller, request);
1368      }
1369
1370      @Override
1371      public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1372          final RpcController controller,
1373          final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException {
1374        return stub.isMasterInMaintenanceMode(controller, request);
1375      }
1376
1377      @Override
1378      public MasterProtos.BalanceResponse balance(RpcController controller,
1379          MasterProtos.BalanceRequest request) throws ServiceException {
1380        return stub.balance(controller, request);
1381      }
1382
1383      @Override
1384      public MasterProtos.SetBalancerRunningResponse setBalancerRunning(
1385          RpcController controller, MasterProtos.SetBalancerRunningRequest request)
1386          throws ServiceException {
1387        return stub.setBalancerRunning(controller, request);
1388      }
1389
1390      @Override
1391      public NormalizeResponse normalize(RpcController controller,
1392          NormalizeRequest request) throws ServiceException {
1393        return stub.normalize(controller, request);
1394      }
1395
1396      @Override
1397      public SetNormalizerRunningResponse setNormalizerRunning(
1398          RpcController controller, SetNormalizerRunningRequest request)
1399          throws ServiceException {
1400        return stub.setNormalizerRunning(controller, request);
1401      }
1402
1403      @Override
1404      public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller,
1405          MasterProtos.RunCatalogScanRequest request) throws ServiceException {
1406        return stub.runCatalogScan(controller, request);
1407      }
1408
1409      @Override
1410      public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor(
1411          RpcController controller, MasterProtos.EnableCatalogJanitorRequest request)
1412          throws ServiceException {
1413        return stub.enableCatalogJanitor(controller, request);
1414      }
1415
1416      @Override
1417      public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1418          RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request)
1419          throws ServiceException {
1420        return stub.isCatalogJanitorEnabled(controller, request);
1421      }
1422
1423      @Override
1424      public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller,
1425          MasterProtos.RunCleanerChoreRequest request)
1426          throws ServiceException {
1427        return stub.runCleanerChore(controller, request);
1428      }
1429
1430      @Override
1431      public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning(
1432          RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request)
1433          throws ServiceException {
1434        return stub.setCleanerChoreRunning(controller, request);
1435      }
1436
1437      @Override
1438      public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled(
1439          RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request)
1440          throws ServiceException {
1441        return stub.isCleanerChoreEnabled(controller, request);
1442      }
1443
1444      @Override
1445      public ClientProtos.CoprocessorServiceResponse execMasterService(
1446          RpcController controller, ClientProtos.CoprocessorServiceRequest request)
1447          throws ServiceException {
1448        return stub.execMasterService(controller, request);
1449      }
1450
1451      @Override
1452      public MasterProtos.SnapshotResponse snapshot(RpcController controller,
1453          MasterProtos.SnapshotRequest request) throws ServiceException {
1454        return stub.snapshot(controller, request);
1455      }
1456
1457      @Override
1458      public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots(
1459          RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request)
1460          throws ServiceException {
1461        return stub.getCompletedSnapshots(controller, request);
1462      }
1463
1464      @Override
1465      public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1466          MasterProtos.DeleteSnapshotRequest request) throws ServiceException {
1467        return stub.deleteSnapshot(controller, request);
1468      }
1469
1470      @Override
1471      public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1472          MasterProtos.IsSnapshotDoneRequest request) throws ServiceException {
1473        return stub.isSnapshotDone(controller, request);
1474      }
1475
1476      @Override
1477      public MasterProtos.RestoreSnapshotResponse restoreSnapshot(
1478          RpcController controller, MasterProtos.RestoreSnapshotRequest request)
1479          throws ServiceException {
1480        return stub.restoreSnapshot(controller, request);
1481      }
1482
1483      @Override
1484      public MasterProtos.ExecProcedureResponse execProcedure(
1485          RpcController controller, MasterProtos.ExecProcedureRequest request)
1486          throws ServiceException {
1487        return stub.execProcedure(controller, request);
1488      }
1489
1490      @Override
1491      public MasterProtos.ExecProcedureResponse execProcedureWithRet(
1492          RpcController controller, MasterProtos.ExecProcedureRequest request)
1493          throws ServiceException {
1494        return stub.execProcedureWithRet(controller, request);
1495      }
1496
1497      @Override
1498      public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller,
1499          MasterProtos.IsProcedureDoneRequest request) throws ServiceException {
1500        return stub.isProcedureDone(controller, request);
1501      }
1502
1503      @Override
1504      public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
1505          MasterProtos.GetProcedureResultRequest request) throws ServiceException {
1506        return stub.getProcedureResult(controller, request);
1507      }
1508
1509      @Override
1510      public MasterProtos.IsMasterRunningResponse isMasterRunning(
1511          RpcController controller, MasterProtos.IsMasterRunningRequest request)
1512          throws ServiceException {
1513        return stub.isMasterRunning(controller, request);
1514      }
1515
1516      @Override
1517      public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller,
1518          MasterProtos.ModifyNamespaceRequest request)
1519      throws ServiceException {
1520        return stub.modifyNamespace(controller, request);
1521      }
1522
1523      @Override
1524      public MasterProtos.CreateNamespaceResponse createNamespace(
1525          RpcController controller,
1526          MasterProtos.CreateNamespaceRequest request) throws ServiceException {
1527        return stub.createNamespace(controller, request);
1528      }
1529
1530      @Override
1531      public MasterProtos.DeleteNamespaceResponse deleteNamespace(
1532          RpcController controller,
1533          MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
1534        return stub.deleteNamespace(controller, request);
1535      }
1536
1537      @Override
1538      public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(
1539          RpcController controller,
1540          MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException {
1541        return stub.getNamespaceDescriptor(controller, request);
1542      }
1543
1544      @Override
1545      public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(
1546          RpcController controller,
1547          MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException {
1548        return stub.listNamespaceDescriptors(controller, request);
1549      }
1550
1551      @Override
1552      public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1553          RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request)
1554              throws ServiceException {
1555        return stub.listTableDescriptorsByNamespace(controller, request);
1556      }
1557
1558      @Override
1559      public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1560          RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request)
1561              throws ServiceException {
1562        return stub.listTableNamesByNamespace(controller, request);
1563      }
1564
1565      @Override
1566      public MasterProtos.GetTableStateResponse getTableState(
1567              RpcController controller, MasterProtos.GetTableStateRequest request)
1568              throws ServiceException {
1569        return stub.getTableState(controller, request);
1570      }
1571
1572      @Override
1573      public void close() {
1574        release(this.mss);
1575      }
1576
1577      @Override
1578      public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus(
1579          RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request)
1580          throws ServiceException {
1581        return stub.getSchemaAlterStatus(controller, request);
1582      }
1583
1584      @Override
1585      public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(
1586          RpcController controller, MasterProtos.GetTableDescriptorsRequest request)
1587          throws ServiceException {
1588        return stub.getTableDescriptors(controller, request);
1589      }
1590
1591      @Override
1592      public MasterProtos.GetTableNamesResponse getTableNames(
1593          RpcController controller, MasterProtos.GetTableNamesRequest request)
1594          throws ServiceException {
1595        return stub.getTableNames(controller, request);
1596      }
1597
1598      @Override
1599      public MasterProtos.GetClusterStatusResponse getClusterStatus(
1600          RpcController controller, MasterProtos.GetClusterStatusRequest request)
1601          throws ServiceException {
1602        return stub.getClusterStatus(controller, request);
1603      }
1604
1605      @Override
1606      public MasterProtos.SetQuotaResponse setQuota(
1607          RpcController controller, MasterProtos.SetQuotaRequest request)
1608          throws ServiceException {
1609        return stub.setQuota(controller, request);
1610      }
1611
1612      @Override
1613      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
1614          RpcController controller, MasterProtos.MajorCompactionTimestampRequest request)
1615          throws ServiceException {
1616        return stub.getLastMajorCompactionTimestamp(controller, request);
1617      }
1618
1619      @Override
1620      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
1621          RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request)
1622          throws ServiceException {
1623        return stub.getLastMajorCompactionTimestampForRegion(controller, request);
1624      }
1625
1626      @Override
1627      public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
1628          IsBalancerEnabledRequest request) throws ServiceException {
1629        return stub.isBalancerEnabled(controller, request);
1630      }
1631
1632      @Override
1633      public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
1634        RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request)
1635        throws ServiceException {
1636        return stub.setSplitOrMergeEnabled(controller, request);
1637      }
1638
1639      @Override
1640      public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
1641        RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request)
1642              throws ServiceException {
1643        return stub.isSplitOrMergeEnabled(controller, request);
1644      }
1645
1646      @Override
1647      public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
1648          IsNormalizerEnabledRequest request) throws ServiceException {
1649        return stub.isNormalizerEnabled(controller, request);
1650      }
1651
1652      @Override
1653      public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
1654          SecurityCapabilitiesRequest request) throws ServiceException {
1655        return stub.getSecurityCapabilities(controller, request);
1656      }
1657
1658      @Override
1659      public AddReplicationPeerResponse addReplicationPeer(RpcController controller,
1660          AddReplicationPeerRequest request) throws ServiceException {
1661        return stub.addReplicationPeer(controller, request);
1662      }
1663
1664      @Override
1665      public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
1666          RemoveReplicationPeerRequest request) throws ServiceException {
1667        return stub.removeReplicationPeer(controller, request);
1668      }
1669
1670      @Override
1671      public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
1672          EnableReplicationPeerRequest request) throws ServiceException {
1673        return stub.enableReplicationPeer(controller, request);
1674      }
1675
1676      @Override
1677      public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
1678          DisableReplicationPeerRequest request) throws ServiceException {
1679        return stub.disableReplicationPeer(controller, request);
1680      }
1681
1682      @Override
1683      public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
1684          ListDecommissionedRegionServersRequest request) throws ServiceException {
1685        return stub.listDecommissionedRegionServers(controller, request);
1686      }
1687
1688      @Override
1689      public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
1690          DecommissionRegionServersRequest request) throws ServiceException {
1691        return stub.decommissionRegionServers(controller, request);
1692      }
1693
1694      @Override
1695      public RecommissionRegionServerResponse recommissionRegionServer(
1696          RpcController controller, RecommissionRegionServerRequest request)
1697          throws ServiceException {
1698        return stub.recommissionRegionServer(controller, request);
1699      }
1700
1701      @Override
1702      public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
1703          GetReplicationPeerConfigRequest request) throws ServiceException {
1704        return stub.getReplicationPeerConfig(controller, request);
1705      }
1706
1707      @Override
1708      public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig(
1709          RpcController controller, UpdateReplicationPeerConfigRequest request)
1710          throws ServiceException {
1711        return stub.updateReplicationPeerConfig(controller, request);
1712      }
1713
1714      @Override
1715      public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
1716          ListReplicationPeersRequest request) throws ServiceException {
1717        return stub.listReplicationPeers(controller, request);
1718      }
1719
1720      @Override
1721      public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(
1722          RpcController controller, GetSpaceQuotaRegionSizesRequest request)
1723          throws ServiceException {
1724        return stub.getSpaceQuotaRegionSizes(controller, request);
1725      }
1726
1727      @Override
1728      public GetQuotaStatesResponse getQuotaStates(
1729          RpcController controller, GetQuotaStatesRequest request) throws ServiceException {
1730        return stub.getQuotaStates(controller, request);
1731      }
1732
1733      @Override
1734      public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller,
1735          MasterProtos.ClearDeadServersRequest request) throws ServiceException {
1736        return stub.clearDeadServers(controller, request);
1737      }
1738    };
1739  }
1740
1741  private static void release(MasterServiceState mss) {
1742    if (mss != null && mss.connection != null) {
1743      ((ConnectionImplementation)mss.connection).releaseMaster(mss);
1744    }
1745  }
1746
1747  private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
1748    if (mss.getStub() == null){
1749      return false;
1750    }
1751    try {
1752      return mss.isMasterRunning();
1753    } catch (UndeclaredThrowableException e) {
1754      // It's somehow messy, but we can receive exceptions such as
1755      //  java.net.ConnectException but they're not declared. So we catch it...
1756      LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
1757      return false;
1758    } catch (IOException se) {
1759      LOG.warn("Checking master connection", se);
1760      return false;
1761    }
1762  }
1763
1764  void releaseMaster(MasterServiceState mss) {
1765    if (mss.getStub() == null) {
1766      return;
1767    }
1768    synchronized (masterLock) {
1769      --mss.userCount;
1770    }
1771  }
1772
1773  private void closeMasterService(MasterServiceState mss) {
1774    if (mss.getStub() != null) {
1775      LOG.info("Closing master protocol: " + mss);
1776      mss.clearStub();
1777    }
1778    mss.userCount = 0;
1779  }
1780
1781  /**
1782   * Immediate close of the shared master. Can be by the delayed close or when closing the
1783   * connection itself.
1784   */
1785  private void closeMaster() {
1786    synchronized (masterLock) {
1787      closeMasterService(masterServiceState);
1788    }
1789  }
1790
1791  void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) {
1792    HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
1793    cacheLocation(hri.getTable(), source, newHrl);
1794  }
1795
1796  @Override
1797  public void deleteCachedRegionLocation(final HRegionLocation location) {
1798    metaCache.clearCache(location);
1799  }
1800
1801  /**
1802   * Update the location with the new value (if the exception is a RegionMovedException)
1803   * or delete it from the cache. Does nothing if we can be sure from the exception that
1804   * the location is still accurate, or if the cache has already been updated.
1805   * @param exception an object (to simplify user code) on which we will try to find a nested
1806   *   or wrapped or both RegionMovedException
1807   * @param source server that is the source of the location update.
1808   */
1809  @Override
1810  public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
1811    final Object exception, final ServerName source) {
1812    if (rowkey == null || tableName == null) {
1813      LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
1814          ", tableName=" + (tableName == null ? "null" : tableName));
1815      return;
1816    }
1817
1818    if (source == null) {
1819      // This should not happen, but let's secure ourselves.
1820      return;
1821    }
1822
1823    if (regionName == null) {
1824      // we do not know which region, so just remove the cache entry for the row and server
1825      if (metrics != null) {
1826        metrics.incrCacheDroppingExceptions(exception);
1827      }
1828      metaCache.clearCache(tableName, rowkey, source);
1829      return;
1830    }
1831
1832    // Is it something we have already updated?
1833    final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
1834    HRegionLocation oldLocation = null;
1835    if (oldLocations != null) {
1836      oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
1837    }
1838    if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
1839      // There is no such location in the cache (it's been removed already) or
1840      // the cache has already been refreshed with a different location.  => nothing to do
1841      return;
1842    }
1843
1844    RegionInfo regionInfo = oldLocation.getRegion();
1845    Throwable cause = ClientExceptionsUtil.findException(exception);
1846    if (cause != null) {
1847      if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
1848        // We know that the region is still on this region server
1849        return;
1850      }
1851
1852      if (cause instanceof RegionMovedException) {
1853        RegionMovedException rme = (RegionMovedException) cause;
1854        if (LOG.isTraceEnabled()) {
1855          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
1856              rme.getHostname() + ":" + rme.getPort() +
1857              " according to " + source.getAddress());
1858        }
1859        // We know that the region is not anymore on this region server, but we know
1860        //  the new location.
1861        updateCachedLocation(
1862            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
1863        return;
1864      }
1865    }
1866
1867    if (metrics != null) {
1868      metrics.incrCacheDroppingExceptions(exception);
1869    }
1870
1871    // If we're here, it means that can cannot be sure about the location, so we remove it from
1872    // the cache. Do not send the source because source can be a new server in the same host:port
1873    metaCache.clearCache(regionInfo);
1874  }
1875
1876  @Override
1877  public AsyncProcess getAsyncProcess() {
1878    return asyncProcess;
1879  }
1880
1881  @Override
1882  public ServerStatisticTracker getStatisticsTracker() {
1883    return this.stats;
1884  }
1885
1886  @Override
1887  public ClientBackoffPolicy getBackoffPolicy() {
1888    return this.backoffPolicy;
1889  }
1890
1891  /*
1892   * Return the number of cached region for a table. It will only be called
1893   * from a unit test.
1894   */
1895  @VisibleForTesting
1896  int getNumberOfCachedRegionLocations(final TableName tableName) {
1897    return metaCache.getNumberOfCachedRegionLocations(tableName);
1898  }
1899
1900  @Override
1901  public void abort(final String msg, Throwable t) {
1902    if (t != null) {
1903      LOG.error(HBaseMarkers.FATAL, msg, t);
1904    } else {
1905      LOG.error(HBaseMarkers.FATAL, msg);
1906    }
1907    this.aborted = true;
1908    close();
1909    this.closed = true;
1910  }
1911
1912  @Override
1913  public boolean isClosed() {
1914    return this.closed;
1915  }
1916
1917  @Override
1918  public boolean isAborted(){
1919    return this.aborted;
1920  }
1921
1922  @Override
1923  public int getCurrentNrHRS() throws IOException {
1924    return get(this.registry.getCurrentNrHRS());
1925  }
1926
1927  @Override
1928  public void close() {
1929    if (this.closed) {
1930      return;
1931    }
1932    closeMaster();
1933    shutdownPools();
1934    if (this.metrics != null) {
1935      this.metrics.shutdown();
1936    }
1937    this.closed = true;
1938    registry.close();
1939    this.stubs.clear();
1940    if (clusterStatusListener != null) {
1941      clusterStatusListener.close();
1942    }
1943    if (rpcClient != null) {
1944      rpcClient.close();
1945    }
1946  }
1947
1948  /**
1949   * Close the connection for good. On the off chance that someone is unable to close
1950   * the connection, perhaps because it bailed out prematurely, the method
1951   * below will ensure that this instance is cleaned up.
1952   * Caveat: The JVM may take an unknown amount of time to call finalize on an
1953   * unreachable object, so our hope is that every consumer cleans up after
1954   * itself, like any good citizen.
1955   */
1956  @Override
1957  protected void finalize() throws Throwable {
1958    super.finalize();
1959    close();
1960  }
1961
1962  @Override
1963  public NonceGenerator getNonceGenerator() {
1964    return nonceGenerator;
1965  }
1966
1967  @Override
1968  public TableState getTableState(TableName tableName) throws IOException {
1969    checkClosed();
1970    TableState tableState = MetaTableAccessor.getTableState(this, tableName);
1971    if (tableState == null) {
1972      throw new TableNotFoundException(tableName);
1973    }
1974    return tableState;
1975  }
1976
1977  @Override
1978  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
1979    return RpcRetryingCallerFactory
1980        .instantiate(conf, this.interceptor, this.getStatisticsTracker());
1981  }
1982
1983  @Override
1984  public boolean hasCellBlockSupport() {
1985    return this.rpcClient.hasCellBlockSupport();
1986  }
1987
1988  @Override
1989  public ConnectionConfiguration getConnectionConfiguration() {
1990    return this.connectionConfig;
1991  }
1992
1993  @Override
1994  public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
1995    return this.rpcCallerFactory;
1996  }
1997
1998  @Override
1999  public RpcControllerFactory getRpcControllerFactory() {
2000    return this.rpcControllerFactory;
2001  }
2002
2003  private static <T> T get(CompletableFuture<T> future) throws IOException {
2004    try {
2005      return future.get();
2006    } catch (InterruptedException e) {
2007      Thread.currentThread().interrupt();
2008      throw (IOException) new InterruptedIOException().initCause(e);
2009    } catch (ExecutionException e) {
2010      Throwable cause = e.getCause();
2011      Throwables.propagateIfPossible(cause, IOException.class);
2012      throw new IOException(cause);
2013    }
2014  }
2015}