001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
024import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
025import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx;
027
028import edu.umd.cs.findbugs.annotations.Nullable;
029import java.io.Closeable;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.lang.reflect.UndeclaredThrowableException;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.Date;
036import java.util.List;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.LinkedBlockingQueue;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.locks.ReentrantLock;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.AuthUtil;
050import org.apache.hadoop.hbase.CallQueueTooBigException;
051import org.apache.hadoop.hbase.ChoreService;
052import org.apache.hadoop.hbase.DoNotRetryIOException;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionLocation;
055import org.apache.hadoop.hbase.MasterNotRunningException;
056import org.apache.hadoop.hbase.MetaTableAccessor;
057import org.apache.hadoop.hbase.RegionLocations;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.TableNotEnabledException;
061import org.apache.hadoop.hbase.TableNotFoundException;
062import org.apache.hadoop.hbase.ZooKeeperConnectionException;
063import org.apache.hadoop.hbase.client.Scan.ReadType;
064import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
065import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
066import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
067import org.apache.hadoop.hbase.exceptions.RegionMovedException;
068import org.apache.hadoop.hbase.ipc.RpcClient;
069import org.apache.hadoop.hbase.ipc.RpcClientFactory;
070import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
071import org.apache.hadoop.hbase.log.HBaseMarkers;
072import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
073import org.apache.hadoop.hbase.security.User;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.ExceptionUtil;
077import org.apache.hadoop.hbase.util.Pair;
078import org.apache.hadoop.hbase.util.ReflectionUtils;
079import org.apache.hadoop.hbase.util.Threads;
080import org.apache.hadoop.ipc.RemoteException;
081import org.apache.hadoop.security.UserGroupInformation;
082import org.apache.yetus.audience.InterfaceAudience;
083import org.apache.zookeeper.KeeperException;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
088import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
089import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
090import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
091import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
092
093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
094import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
146
147/**
148 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
149 * Encapsulates connection to zookeeper and regionservers.
150 */
151@edu.umd.cs.findbugs.annotations.SuppressWarnings(
152    value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
153    justification="Access to the conncurrent hash map is under a lock so should be fine.")
154@InterfaceAudience.Private
155class ConnectionImplementation implements ClusterConnection, Closeable {
156  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
157  private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
158
159  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
160
161  private final boolean hostnamesCanChange;
162  private final long pause;
163  private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
164  private boolean useMetaReplicas;
165  private final int metaReplicaCallTimeoutScanInMicroSecond;
166  private final int numTries;
167  final int rpcTimeout;
168
169  /**
170   * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
171   * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
172   */
173  private static volatile NonceGenerator nonceGenerator = null;
174  /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */
175  private static final Object nonceGeneratorCreateLock = new Object();
176
177  private final AsyncProcess asyncProcess;
178  // single tracker per connection
179  private final ServerStatisticTracker stats;
180
181  private volatile boolean closed;
182  private volatile boolean aborted;
183
184  // package protected for the tests
185  ClusterStatusListener clusterStatusListener;
186
187  private final Object metaRegionLock = new Object();
188
189  private final Object masterLock = new Object();
190
191  // thread executor shared by all Table instances created
192  // by this connection
193  private volatile ThreadPoolExecutor batchPool = null;
194  // meta thread executor shared by all Table instances created
195  // by this connection
196  private volatile ThreadPoolExecutor metaLookupPool = null;
197  private volatile boolean cleanupPool = false;
198
199  private final Configuration conf;
200
201  // cache the configuration value for tables so that we can avoid calling
202  // the expensive Configuration to fetch the value multiple times.
203  private final ConnectionConfiguration connectionConfig;
204
205  // Client rpc instance.
206  private final RpcClient rpcClient;
207
208  private final MetaCache metaCache;
209  private final MetricsConnection metrics;
210
211  protected User user;
212
213  private final RpcRetryingCallerFactory rpcCallerFactory;
214
215  private final RpcControllerFactory rpcControllerFactory;
216
217  private final RetryingCallerInterceptor interceptor;
218
219  /**
220   * Cluster registry of basic info such as clusterid and meta region location.
221   */
222  private final AsyncRegistry registry;
223
224  private final ClientBackoffPolicy backoffPolicy;
225
226  /**
227   * Allow setting an alternate BufferedMutator implementation via
228   * config. If null, use default.
229   */
230  private final String alternateBufferedMutatorClassName;
231
232  /** lock guards against multiple threads trying to query the meta region at the same time */
233  private final ReentrantLock userRegionLock = new ReentrantLock();
234
235  private ChoreService authService;
236
237  /**
238   * constructor
239   * @param conf Configuration object
240   */
241  ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
242    this.conf = conf;
243    this.user = user;
244    if (user != null && user.isLoginFromKeytab()) {
245      spawnRenewalChore(user.getUGI());
246    }
247    this.batchPool = (ThreadPoolExecutor) pool;
248    this.connectionConfig = new ConnectionConfiguration(conf);
249    this.closed = false;
250    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
251        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
252    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
253    if (configuredPauseForCQTBE < pause) {
254      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
255          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
256          + ", will use " + pause + " instead.");
257      this.pauseForCQTBE = pause;
258    } else {
259      this.pauseForCQTBE = configuredPauseForCQTBE;
260    }
261    this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
262      HConstants.DEFAULT_USE_META_REPLICAS);
263    this.metaReplicaCallTimeoutScanInMicroSecond =
264        connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
265
266    // how many times to try, one more than max *retry* time
267    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
268    this.rpcTimeout = conf.getInt(
269        HConstants.HBASE_RPC_TIMEOUT_KEY,
270        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
271    if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
272      synchronized (nonceGeneratorCreateLock) {
273        if (nonceGenerator == null) {
274          nonceGenerator = PerClientRandomNonceGenerator.get();
275        }
276      }
277    } else {
278      nonceGenerator = NO_NONCE_GENERATOR;
279    }
280
281    this.stats = ServerStatisticTracker.create(conf);
282    this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
283    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
284    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
285    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
286    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
287    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
288      this.metrics =
289        new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool);
290    } else {
291      this.metrics = null;
292    }
293    this.metaCache = new MetaCache(this.metrics);
294
295    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
296        HConstants.STATUS_PUBLISHED_DEFAULT);
297    this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
298    Class<? extends ClusterStatusListener.Listener> listenerClass =
299        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
300            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
301            ClusterStatusListener.Listener.class);
302
303    // Is there an alternate BufferedMutator to use?
304    this.alternateBufferedMutatorClassName =
305        this.conf.get(BufferedMutator.CLASSNAME_KEY);
306
307    try {
308      this.registry = AsyncRegistryFactory.getRegistry(conf);
309      retrieveClusterId();
310
311      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
312
313      // Do we publish the status?
314      if (shouldListen) {
315        if (listenerClass == null) {
316          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
317              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
318        } else {
319          clusterStatusListener = new ClusterStatusListener(
320              new ClusterStatusListener.DeadServerHandler() {
321                @Override
322                public void newDead(ServerName sn) {
323                  clearCaches(sn);
324                  rpcClient.cancelConnections(sn);
325                }
326              }, conf, listenerClass);
327        }
328      }
329    } catch (Throwable e) {
330      // avoid leaks: registry, rpcClient, ...
331      LOG.debug("connection construction failed", e);
332      close();
333      throw e;
334    }
335  }
336
337  private void spawnRenewalChore(final UserGroupInformation user) {
338    authService = new ChoreService("Relogin service");
339    authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
340  }
341
342  /**
343   * @param useMetaReplicas
344   */
345  @VisibleForTesting
346  void setUseMetaReplicas(final boolean useMetaReplicas) {
347    this.useMetaReplicas = useMetaReplicas;
348  }
349
350  /**
351   * @param conn The connection for which to replace the generator.
352   * @param cnm Replaces the nonce generator used, for testing.
353   * @return old nonce generator.
354   */
355  @VisibleForTesting
356  static NonceGenerator injectNonceGeneratorForTesting(
357      ClusterConnection conn, NonceGenerator cnm) {
358    ConnectionImplementation connImpl = (ConnectionImplementation)conn;
359    NonceGenerator ng = connImpl.getNonceGenerator();
360    LOG.warn("Nonce generator is being replaced by test code for "
361      + cnm.getClass().getName());
362    nonceGenerator = cnm;
363    return ng;
364  }
365
366  @Override
367  public Table getTable(TableName tableName) throws IOException {
368    return getTable(tableName, getBatchPool());
369  }
370
371  @Override
372  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
373    return new TableBuilderBase(tableName, connectionConfig) {
374
375      @Override
376      public Table build() {
377        return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
378            rpcControllerFactory, pool);
379      }
380    };
381  }
382
383  @Override
384  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
385    if (params.getTableName() == null) {
386      throw new IllegalArgumentException("TableName cannot be null.");
387    }
388    if (params.getPool() == null) {
389      params.pool(HTable.getDefaultExecutor(getConfiguration()));
390    }
391    if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
392      params.writeBufferSize(connectionConfig.getWriteBufferSize());
393    }
394    if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
395      params.setWriteBufferPeriodicFlushTimeoutMs(
396              connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
397    }
398    if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
399      params.setWriteBufferPeriodicFlushTimerTickMs(
400              connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
401    }
402    if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
403      params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
404    }
405    // Look to see if an alternate BufferedMutation implementation is wanted.
406    // Look in params and in config. If null, use default.
407    String implementationClassName = params.getImplementationClassName();
408    if (implementationClassName == null) {
409      implementationClassName = this.alternateBufferedMutatorClassName;
410    }
411    if (implementationClassName == null) {
412      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
413    }
414    try {
415      return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
416          this, rpcCallerFactory, rpcControllerFactory, params);
417    } catch (ClassNotFoundException e) {
418      throw new RuntimeException(e);
419    }
420  }
421
422  @Override
423  public BufferedMutator getBufferedMutator(TableName tableName) {
424    return getBufferedMutator(new BufferedMutatorParams(tableName));
425  }
426
427  @Override
428  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
429    return new HRegionLocator(tableName, this);
430  }
431
432  @Override
433  public Admin getAdmin() throws IOException {
434    return new HBaseAdmin(this);
435  }
436
437  @Override
438  public Hbck getHbck() throws IOException {
439    return getHbck(get(registry.getMasterAddress()));
440  }
441
442  @Override
443  public Hbck getHbck(ServerName masterServer) throws IOException {
444    checkClosed();
445    if (isDeadServer(masterServer)) {
446      throw new RegionServerStoppedException(masterServer + " is dead.");
447    }
448    String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
449      masterServer, this.hostnamesCanChange);
450
451    return new HBaseHbck(
452      (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
453        BlockingRpcChannel channel =
454          this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
455        return MasterProtos.HbckService.newBlockingStub(channel);
456      }), rpcControllerFactory);
457  }
458
459  @Override
460  public MetricsConnection getConnectionMetrics() {
461    return this.metrics;
462  }
463
464  private ThreadPoolExecutor getBatchPool() {
465    if (batchPool == null) {
466      synchronized (this) {
467        if (batchPool == null) {
468          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
469          this.batchPool = getThreadPool(threads, threads, "-shared", null);
470          this.cleanupPool = true;
471        }
472      }
473    }
474    return this.batchPool;
475  }
476
477  private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint,
478      BlockingQueue<Runnable> passedWorkQueue) {
479    // shared HTable thread executor not yet initialized
480    if (maxThreads == 0) {
481      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
482    }
483    if (coreThreads == 0) {
484      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
485    }
486    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
487    BlockingQueue<Runnable> workQueue = passedWorkQueue;
488    if (workQueue == null) {
489      workQueue =
490        new LinkedBlockingQueue<>(maxThreads *
491            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
492                HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
493      coreThreads = maxThreads;
494    }
495    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
496        coreThreads,
497        maxThreads,
498        keepAliveTime,
499        TimeUnit.SECONDS,
500        workQueue,
501        Threads.newDaemonThreadFactory(toString() + nameHint));
502    tpe.allowCoreThreadTimeOut(true);
503    return tpe;
504  }
505
506  private ThreadPoolExecutor getMetaLookupPool() {
507    if (this.metaLookupPool == null) {
508      synchronized (this) {
509        if (this.metaLookupPool == null) {
510          //Some of the threads would be used for meta replicas
511          //To start with, threads.max.core threads can hit the meta (including replicas).
512          //After that, requests will get queued up in the passed queue, and only after
513          //the queue is full, a new thread will be started
514          int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
515          this.metaLookupPool = getThreadPool(
516             threads,
517             threads,
518             "-metaLookup-shared-", new LinkedBlockingQueue<>());
519        }
520      }
521    }
522    return this.metaLookupPool;
523  }
524
525  protected ExecutorService getCurrentMetaLookupPool() {
526    return metaLookupPool;
527  }
528
529  protected ExecutorService getCurrentBatchPool() {
530    return batchPool;
531  }
532
533  private void shutdownPools() {
534    if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
535      shutdownBatchPool(this.batchPool);
536    }
537    if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
538      shutdownBatchPool(this.metaLookupPool);
539    }
540  }
541
542  private void shutdownBatchPool(ExecutorService pool) {
543    pool.shutdown();
544    try {
545      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
546        pool.shutdownNow();
547      }
548    } catch (InterruptedException e) {
549      pool.shutdownNow();
550    }
551  }
552
553  /**
554   * For tests only.
555   */
556  @VisibleForTesting
557  RpcClient getRpcClient() {
558    return rpcClient;
559  }
560
561  /**
562   * An identifier that will remain the same for a given connection.
563   */
564  @Override
565  public String toString(){
566    return "hconnection-0x" + Integer.toHexString(hashCode());
567  }
568
569  protected String clusterId = null;
570
571  protected void retrieveClusterId() {
572    if (clusterId != null) {
573      return;
574    }
575    try {
576      this.clusterId = this.registry.getClusterId().get();
577    } catch (InterruptedException | ExecutionException e) {
578      LOG.warn("Retrieve cluster id failed", e);
579    }
580    if (clusterId == null) {
581      clusterId = HConstants.CLUSTER_ID_DEFAULT;
582      LOG.debug("clusterid came back null, using default " + clusterId);
583    }
584  }
585
586  @Override
587  public Configuration getConfiguration() {
588    return this.conf;
589  }
590
591  private void checkClosed() throws DoNotRetryIOException {
592    if (this.closed) {
593      throw new DoNotRetryIOException(toString() + " closed");
594    }
595  }
596
597  /**
598   * @return true if the master is running, throws an exception otherwise
599   * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
600   * @deprecated this has been deprecated without a replacement
601   */
602  @Deprecated
603  @Override
604  public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
605    // When getting the master connection, we check it's running,
606    // so if there is no exception, it means we've been able to get a
607    // connection on a running master
608    MasterKeepAliveConnection m;
609    try {
610      m = getKeepAliveMasterService();
611    } catch (IOException e) {
612      throw new MasterNotRunningException(e);
613    }
614    m.close();
615    return true;
616  }
617
618  @Override
619  public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
620      boolean reload) throws IOException {
621    return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
622  }
623
624
625  @Override
626  public boolean isTableEnabled(TableName tableName) throws IOException {
627    return getTableState(tableName).inStates(TableState.State.ENABLED);
628  }
629
630  @Override
631  public boolean isTableDisabled(TableName tableName) throws IOException {
632    return getTableState(tableName).inStates(TableState.State.DISABLED);
633  }
634
635  @Override
636  public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
637      throws IOException {
638    checkClosed();
639    try {
640      if (!isTableEnabled(tableName)) {
641        LOG.debug("Table {} not enabled", tableName);
642        return false;
643      }
644      List<Pair<RegionInfo, ServerName>> locations =
645        MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true);
646
647      int notDeployed = 0;
648      int regionCount = 0;
649      for (Pair<RegionInfo, ServerName> pair : locations) {
650        RegionInfo info = pair.getFirst();
651        if (pair.getSecond() == null) {
652          LOG.debug("Table {} has not deployed region {}", tableName,
653              pair.getFirst().getEncodedName());
654          notDeployed++;
655        } else if (splitKeys != null
656            && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
657          for (byte[] splitKey : splitKeys) {
658            // Just check if the splitkey is available
659            if (Bytes.equals(info.getStartKey(), splitKey)) {
660              regionCount++;
661              break;
662            }
663          }
664        } else {
665          // Always empty start row should be counted
666          regionCount++;
667        }
668      }
669      if (notDeployed > 0) {
670        if (LOG.isDebugEnabled()) {
671          LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed);
672        }
673        return false;
674      } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
675        if (LOG.isDebugEnabled()) {
676          LOG.debug("Table {} expected to have {} regions, but only {} available", tableName,
677              splitKeys.length + 1, regionCount);
678        }
679        return false;
680      } else {
681        LOG.trace("Table {} should be available", tableName);
682        return true;
683      }
684    } catch (TableNotFoundException tnfe) {
685      LOG.warn("Table {} does not exist", tableName);
686      return false;
687    }
688  }
689
690  @Override
691  public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
692    RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
693      RegionInfo.getStartKey(regionName), false, true);
694    return locations == null ? null : locations.getRegionLocation();
695  }
696
697  private boolean isDeadServer(ServerName sn) {
698    if (clusterStatusListener == null) {
699      return false;
700    } else {
701      return clusterStatusListener.isDeadServer(sn);
702    }
703  }
704
705  @Override
706  public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
707    return locateRegions(tableName, false, true);
708  }
709
710  @Override
711  public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
712      boolean offlined) throws IOException {
713    List<RegionInfo> regions;
714    if (TableName.isMetaTableName(tableName)) {
715      regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO);
716    } else {
717      regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined);
718    }
719    List<HRegionLocation> locations = new ArrayList<>();
720    for (RegionInfo regionInfo : regions) {
721      if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) {
722        continue;
723      }
724      RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
725      if (list != null) {
726        for (HRegionLocation loc : list.getRegionLocations()) {
727          if (loc != null) {
728            locations.add(loc);
729          }
730        }
731      }
732    }
733    return locations;
734  }
735
736  @Override
737  public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
738      throws IOException {
739    RegionLocations locations = locateRegion(tableName, row, true, true);
740    return locations == null ? null : locations.getRegionLocation();
741  }
742
743  @Override
744  public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
745      throws IOException {
746    RegionLocations locations =
747      relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
748    return locations == null ? null
749      : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
750  }
751
752  @Override
753  public RegionLocations relocateRegion(final TableName tableName,
754      final byte [] row, int replicaId) throws IOException{
755    // Since this is an explicit request not to use any caching, finding
756    // disabled tables should not be desirable.  This will ensure that an exception is thrown when
757    // the first time a disabled table is interacted with.
758    if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
759      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
760    }
761
762    return locateRegion(tableName, row, false, true, replicaId);
763  }
764
765  @Override
766  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
767      boolean retry) throws IOException {
768    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
769  }
770
771  @Override
772  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
773      boolean retry, int replicaId) throws IOException {
774    checkClosed();
775    if (tableName == null || tableName.getName().length == 0) {
776      throw new IllegalArgumentException("table name cannot be null or zero length");
777    }
778    if (tableName.equals(TableName.META_TABLE_NAME)) {
779      return locateMeta(tableName, useCache, replicaId);
780    } else {
781      // Region not in the cache - have to go to the meta RS
782      return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
783    }
784  }
785
786  private RegionLocations locateMeta(final TableName tableName,
787      boolean useCache, int replicaId) throws IOException {
788    // HBASE-10785: We cache the location of the META itself, so that we are not overloading
789    // zookeeper with one request for every region lookup. We cache the META with empty row
790    // key in MetaCache.
791    byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
792    RegionLocations locations = null;
793    if (useCache) {
794      locations = getCachedLocation(tableName, metaCacheKey);
795      if (locations != null && locations.getRegionLocation(replicaId) != null) {
796        return locations;
797      }
798    }
799
800    // only one thread should do the lookup.
801    synchronized (metaRegionLock) {
802      // Check the cache again for a hit in case some other thread made the
803      // same query while we were waiting on the lock.
804      if (useCache) {
805        locations = getCachedLocation(tableName, metaCacheKey);
806        if (locations != null && locations.getRegionLocation(replicaId) != null) {
807          return locations;
808        }
809      }
810
811      // Look up from zookeeper
812      locations = get(this.registry.getMetaRegionLocation());
813      if (locations != null) {
814        cacheLocation(tableName, locations);
815      }
816    }
817    return locations;
818  }
819
820  /**
821   * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're
822   * seeking.
823   */
824  private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache,
825      boolean retry, int replicaId) throws IOException {
826    // If we are supposed to be using the cache, look in the cache to see if we already have the
827    // region.
828    if (useCache) {
829      RegionLocations locations = getCachedLocation(tableName, row);
830      if (locations != null && locations.getRegionLocation(replicaId) != null) {
831        return locations;
832      }
833    }
834    // build the key of the meta region we should be looking for.
835    // the extra 9's on the end are necessary to allow "exact" matches
836    // without knowing the precise region names.
837    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
838    byte[] metaStopKey =
839      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
840    Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
841      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
842      .setReadType(ReadType.PREAD);
843    if (this.useMetaReplicas) {
844      s.setConsistency(Consistency.TIMELINE);
845    }
846    int maxAttempts = (retry ? numTries : 1);
847    boolean relocateMeta = false;
848    for (int tries = 0; ; tries++) {
849      if (tries >= maxAttempts) {
850        throw new NoServerForRegionException("Unable to find region for "
851            + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
852      }
853      if (useCache) {
854        RegionLocations locations = getCachedLocation(tableName, row);
855        if (locations != null && locations.getRegionLocation(replicaId) != null) {
856          return locations;
857        }
858      } else {
859        // If we are not supposed to be using the cache, delete any existing cached location
860        // so it won't interfere.
861        // We are only supposed to clean the cache for the specific replicaId
862        metaCache.clearCache(tableName, row, replicaId);
863      }
864      // Query the meta region
865      long pauseBase = this.pause;
866      userRegionLock.lock();
867      try {
868        if (useCache) {// re-check cache after get lock
869          RegionLocations locations = getCachedLocation(tableName, row);
870          if (locations != null && locations.getRegionLocation(replicaId) != null) {
871            return locations;
872          }
873        }
874        if (relocateMeta) {
875          relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
876            RegionInfo.DEFAULT_REPLICA_ID);
877        }
878        s.resetMvccReadPoint();
879        try (ReversedClientScanner rcs =
880          new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
881            rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
882          boolean tableNotFound = true;
883          for (;;) {
884            Result regionInfoRow = rcs.next();
885            if (regionInfoRow == null) {
886              if (tableNotFound) {
887                throw new TableNotFoundException(tableName);
888              } else {
889                throw new IOException(
890                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
891              }
892            }
893            tableNotFound = false;
894            // convert the row result into the HRegionLocation we need!
895            RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
896            if (locations == null || locations.getRegionLocation(replicaId) == null) {
897              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
898            }
899            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
900            if (regionInfo == null) {
901              throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME +
902                ", row=" + regionInfoRow);
903            }
904            // See HBASE-20182. It is possible that we locate to a split parent even after the
905            // children are online, so here we need to skip this region and go to the next one.
906            if (regionInfo.isSplitParent()) {
907              continue;
908            }
909            if (regionInfo.isOffline()) {
910              throw new RegionOfflineException("Region offline; disable table call? " +
911                  regionInfo.getRegionNameAsString());
912            }
913            // It is possible that the split children have not been online yet and we have skipped
914            // the parent in the above condition, so we may have already reached a region which does
915            // not contains us.
916            if (!regionInfo.containsRow(row)) {
917              throw new IOException(
918                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
919            }
920            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
921            if (serverName == null) {
922              throw new NoServerForRegionException("No server address listed in " +
923                TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
924                " containing row " + Bytes.toStringBinary(row));
925            }
926            if (isDeadServer(serverName)) {
927              throw new RegionServerStoppedException(
928                "hbase:meta says the region " + regionInfo.getRegionNameAsString() +
929                  " is managed by the server " + serverName + ", but it is dead.");
930            }
931            // Instantiate the location
932            cacheLocation(tableName, locations);
933            return locations;
934          }
935        }
936      } catch (TableNotFoundException e) {
937        // if we got this error, probably means the table just plain doesn't
938        // exist. rethrow the error immediately. this should always be coming
939        // from the HTable constructor.
940        throw e;
941      } catch (IOException e) {
942        ExceptionUtil.rethrowIfInterrupt(e);
943        if (e instanceof RemoteException) {
944          e = ((RemoteException)e).unwrapRemoteException();
945        }
946        if (e instanceof CallQueueTooBigException) {
947          // Give a special check on CallQueueTooBigException, see #HBASE-17114
948          pauseBase = this.pauseForCQTBE;
949        }
950        if (tries < maxAttempts - 1) {
951          LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " +
952            "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
953        } else {
954          throw e;
955        }
956        // Only relocate the parent region if necessary
957        relocateMeta =
958          !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
959      } finally {
960        userRegionLock.unlock();
961      }
962      try{
963        Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
964      } catch (InterruptedException e) {
965        throw new InterruptedIOException("Giving up trying to location region in " +
966          "meta: thread is interrupted.");
967      }
968    }
969  }
970
971  /**
972   * Put a newly discovered HRegionLocation into the cache.
973   * @param tableName The table name.
974   * @param location the new location
975   */
976  @Override
977  public void cacheLocation(final TableName tableName, final RegionLocations location) {
978    metaCache.cacheLocation(tableName, location);
979  }
980
981  /**
982   * Search the cache for a location that fits our table and row key.
983   * Return null if no suitable region is located.
984   * @return Null or region location found in cache.
985   */
986  RegionLocations getCachedLocation(final TableName tableName,
987      final byte [] row) {
988    return metaCache.getCachedLocation(tableName, row);
989  }
990
991  public void clearRegionCache(final TableName tableName, byte[] row) {
992    metaCache.clearCache(tableName, row);
993  }
994
995  /*
996   * Delete all cached entries of a table that maps to a specific location.
997   */
998  @Override
999  public void clearCaches(final ServerName serverName) {
1000    metaCache.clearCache(serverName);
1001  }
1002
1003  @Override
1004  public void clearRegionLocationCache() {
1005    metaCache.clearCache();
1006  }
1007
1008  @Override
1009  public void clearRegionCache(final TableName tableName) {
1010    metaCache.clearCache(tableName);
1011  }
1012
1013  /**
1014   * Put a newly discovered HRegionLocation into the cache.
1015   * @param tableName The table name.
1016   * @param source the source of the new location, if it's not coming from meta
1017   * @param location the new location
1018   */
1019  private void cacheLocation(final TableName tableName, final ServerName source,
1020      final HRegionLocation location) {
1021    metaCache.cacheLocation(tableName, source, location);
1022  }
1023
1024  // Map keyed by service name + regionserver to service stub implementation
1025  private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>();
1026
1027  /**
1028   * State of the MasterService connection/setup.
1029   */
1030  static class MasterServiceState {
1031    Connection connection;
1032
1033    MasterProtos.MasterService.BlockingInterface stub;
1034    int userCount;
1035
1036    MasterServiceState(final Connection connection) {
1037      super();
1038      this.connection = connection;
1039    }
1040
1041    @Override
1042    public String toString() {
1043      return "MasterService";
1044    }
1045
1046    Object getStub() {
1047      return this.stub;
1048    }
1049
1050    void clearStub() {
1051      this.stub = null;
1052    }
1053
1054    boolean isMasterRunning() throws IOException {
1055      MasterProtos.IsMasterRunningResponse response = null;
1056      try {
1057        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1058      } catch (Exception e) {
1059        throw ProtobufUtil.handleRemoteException(e);
1060      }
1061      return response != null? response.getIsMasterRunning(): false;
1062    }
1063  }
1064
1065  /**
1066   * The record of errors for servers.
1067   */
1068  static class ServerErrorTracker {
1069    // We need a concurrent map here, as we could have multiple threads updating it in parallel.
1070    private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>();
1071    private final long canRetryUntil;
1072    private final int maxTries;// max number to try
1073    private final long startTrackingTime;
1074
1075    /**
1076     * Constructor
1077     * @param timeout how long to wait before timeout, in unit of millisecond
1078     * @param maxTries how many times to try
1079     */
1080    public ServerErrorTracker(long timeout, int maxTries) {
1081      this.maxTries = maxTries;
1082      this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
1083      this.startTrackingTime = new Date().getTime();
1084    }
1085
1086    /**
1087     * We stop to retry when we have exhausted BOTH the number of tries and the time allocated.
1088     * @param numAttempt how many times we have tried by now
1089     */
1090    boolean canTryMore(int numAttempt) {
1091      // If there is a single try we must not take into account the time.
1092      return numAttempt < maxTries || (maxTries > 1 &&
1093          EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
1094    }
1095
1096    /**
1097     * Calculates the back-off time for a retrying request to a particular server.
1098     *
1099     * @param server    The server in question.
1100     * @param basePause The default hci pause.
1101     * @return The time to wait before sending next request.
1102     */
1103    long calculateBackoffTime(ServerName server, long basePause) {
1104      long result;
1105      ServerErrors errorStats = errorsByServer.get(server);
1106      if (errorStats != null) {
1107        result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1));
1108      } else {
1109        result = 0; // yes, if the server is not in our list we don't wait before retrying.
1110      }
1111      return result;
1112    }
1113
1114    /**
1115     * Reports that there was an error on the server to do whatever bean-counting necessary.
1116     * @param server The server in question.
1117     */
1118    void reportServerError(ServerName server) {
1119      computeIfAbsent(errorsByServer, server, ServerErrors::new).addError();
1120    }
1121
1122    long getStartTrackingTime() {
1123      return startTrackingTime;
1124    }
1125
1126    /**
1127     * The record of errors for a server.
1128     */
1129    private static class ServerErrors {
1130      private final AtomicInteger retries = new AtomicInteger(0);
1131
1132      public int getCount() {
1133        return retries.get();
1134      }
1135
1136      public void addError() {
1137        retries.incrementAndGet();
1138      }
1139    }
1140  }
1141
1142  /**
1143   * Class to make a MasterServiceStubMaker stub.
1144   */
1145  private final class MasterServiceStubMaker {
1146
1147    private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
1148        throws IOException {
1149      try {
1150        stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1151      } catch (ServiceException e) {
1152        throw ProtobufUtil.handleRemoteException(e);
1153      }
1154    }
1155
1156    /**
1157     * Create a stub. Try once only. It is not typed because there is no common type to protobuf
1158     * services nor their interfaces. Let the caller do appropriate casting.
1159     * @return A stub for master services.
1160     */
1161    private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
1162        throws IOException, KeeperException {
1163      ServerName sn = get(registry.getMasterAddress());
1164      if (sn == null) {
1165        String msg = "ZooKeeper available but no active master location found";
1166        LOG.info(msg);
1167        throw new MasterNotRunningException(msg);
1168      }
1169      if (isDeadServer(sn)) {
1170        throw new MasterNotRunningException(sn + " is dead.");
1171      }
1172      // Use the security info interface name as our stub key
1173      String key =
1174          getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange);
1175      MasterProtos.MasterService.BlockingInterface stub =
1176          (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1177            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1178            return MasterProtos.MasterService.newBlockingStub(channel);
1179          });
1180      isMasterRunning(stub);
1181      return stub;
1182    }
1183
1184    /**
1185     * Create a stub against the master. Retry if necessary.
1186     * @return A stub to do <code>intf</code> against the master
1187     * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
1188     */
1189    MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
1190      // The lock must be at the beginning to prevent multiple master creations
1191      // (and leaks) in a multithread context
1192      synchronized (masterLock) {
1193        Exception exceptionCaught = null;
1194        if (!closed) {
1195          try {
1196            return makeStubNoRetries();
1197          } catch (IOException e) {
1198            exceptionCaught = e;
1199          } catch (KeeperException e) {
1200            exceptionCaught = e;
1201          }
1202          throw new MasterNotRunningException(exceptionCaught);
1203        } else {
1204          throw new DoNotRetryIOException("Connection was closed while trying to get master");
1205        }
1206      }
1207    }
1208  }
1209
1210  @Override
1211  public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
1212    return getAdmin(get(registry.getMasterAddress()));
1213  }
1214
1215  @Override
1216  public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
1217      throws IOException {
1218    checkClosed();
1219    if (isDeadServer(serverName)) {
1220      throw new RegionServerStoppedException(serverName + " is dead.");
1221    }
1222    String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
1223      this.hostnamesCanChange);
1224    return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1225      BlockingRpcChannel channel =
1226          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1227      return AdminProtos.AdminService.newBlockingStub(channel);
1228    });
1229  }
1230
1231  @Override
1232  public BlockingInterface getClient(ServerName serverName) throws IOException {
1233    checkClosed();
1234    if (isDeadServer(serverName)) {
1235      throw new RegionServerStoppedException(serverName + " is dead.");
1236    }
1237    String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(),
1238      serverName, this.hostnamesCanChange);
1239    return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1240      BlockingRpcChannel channel =
1241          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1242      return ClientProtos.ClientService.newBlockingStub(channel);
1243    });
1244  }
1245
1246  final MasterServiceState masterServiceState = new MasterServiceState(this);
1247
1248  @Override
1249  public MasterKeepAliveConnection getMaster() throws IOException {
1250    return getKeepAliveMasterService();
1251  }
1252
1253  private void resetMasterServiceState(final MasterServiceState mss) {
1254    mss.userCount++;
1255  }
1256
1257  private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
1258    synchronized (masterLock) {
1259      if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1260        MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1261        this.masterServiceState.stub = stubMaker.makeStub();
1262      }
1263      resetMasterServiceState(this.masterServiceState);
1264    }
1265    // Ugly delegation just so we can add in a Close method.
1266    final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
1267    return new MasterKeepAliveConnection() {
1268      MasterServiceState mss = masterServiceState;
1269
1270      @Override
1271      public MasterProtos.AbortProcedureResponse abortProcedure(
1272          RpcController controller,
1273          MasterProtos.AbortProcedureRequest request) throws ServiceException {
1274        return stub.abortProcedure(controller, request);
1275      }
1276
1277      @Override
1278      public MasterProtos.GetProceduresResponse getProcedures(
1279          RpcController controller,
1280          MasterProtos.GetProceduresRequest request) throws ServiceException {
1281        return stub.getProcedures(controller, request);
1282      }
1283
1284      @Override
1285      public MasterProtos.GetLocksResponse getLocks(
1286          RpcController controller,
1287          MasterProtos.GetLocksRequest request) throws ServiceException {
1288        return stub.getLocks(controller, request);
1289      }
1290
1291      @Override
1292      public MasterProtos.AddColumnResponse addColumn(
1293          RpcController controller,
1294          MasterProtos.AddColumnRequest request) throws ServiceException {
1295        return stub.addColumn(controller, request);
1296      }
1297
1298      @Override
1299      public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller,
1300          MasterProtos.DeleteColumnRequest request)
1301      throws ServiceException {
1302        return stub.deleteColumn(controller, request);
1303      }
1304
1305      @Override
1306      public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller,
1307          MasterProtos.ModifyColumnRequest request)
1308      throws ServiceException {
1309        return stub.modifyColumn(controller, request);
1310      }
1311
1312      @Override
1313      public MasterProtos.MoveRegionResponse moveRegion(RpcController controller,
1314          MasterProtos.MoveRegionRequest request) throws ServiceException {
1315        return stub.moveRegion(controller, request);
1316      }
1317
1318      @Override
1319      public MasterProtos.MergeTableRegionsResponse mergeTableRegions(
1320          RpcController controller, MasterProtos.MergeTableRegionsRequest request)
1321          throws ServiceException {
1322        return stub.mergeTableRegions(controller, request);
1323      }
1324
1325      @Override
1326      public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
1327          MasterProtos.AssignRegionRequest request) throws ServiceException {
1328        return stub.assignRegion(controller, request);
1329      }
1330
1331      @Override
1332      public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller,
1333          MasterProtos.UnassignRegionRequest request) throws ServiceException {
1334        return stub.unassignRegion(controller, request);
1335      }
1336
1337      @Override
1338      public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller,
1339          MasterProtos.OfflineRegionRequest request) throws ServiceException {
1340        return stub.offlineRegion(controller, request);
1341      }
1342
1343      @Override
1344      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
1345          MasterProtos.SplitTableRegionRequest request) throws ServiceException {
1346        return stub.splitRegion(controller, request);
1347      }
1348
1349      @Override
1350      public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
1351          MasterProtos.DeleteTableRequest request) throws ServiceException {
1352        return stub.deleteTable(controller, request);
1353      }
1354
1355      @Override
1356      public MasterProtos.TruncateTableResponse truncateTable(RpcController controller,
1357          MasterProtos.TruncateTableRequest request) throws ServiceException {
1358        return stub.truncateTable(controller, request);
1359      }
1360
1361      @Override
1362      public MasterProtos.EnableTableResponse enableTable(RpcController controller,
1363          MasterProtos.EnableTableRequest request) throws ServiceException {
1364        return stub.enableTable(controller, request);
1365      }
1366
1367      @Override
1368      public MasterProtos.DisableTableResponse disableTable(RpcController controller,
1369          MasterProtos.DisableTableRequest request) throws ServiceException {
1370        return stub.disableTable(controller, request);
1371      }
1372
1373      @Override
1374      public MasterProtos.ModifyTableResponse modifyTable(RpcController controller,
1375          MasterProtos.ModifyTableRequest request) throws ServiceException {
1376        return stub.modifyTable(controller, request);
1377      }
1378
1379      @Override
1380      public MasterProtos.CreateTableResponse createTable(RpcController controller,
1381          MasterProtos.CreateTableRequest request) throws ServiceException {
1382        return stub.createTable(controller, request);
1383      }
1384
1385      @Override
1386      public MasterProtos.ShutdownResponse shutdown(RpcController controller,
1387          MasterProtos.ShutdownRequest request) throws ServiceException {
1388        return stub.shutdown(controller, request);
1389      }
1390
1391      @Override
1392      public MasterProtos.StopMasterResponse stopMaster(RpcController controller,
1393          MasterProtos.StopMasterRequest request) throws ServiceException {
1394        return stub.stopMaster(controller, request);
1395      }
1396
1397      @Override
1398      public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1399          final RpcController controller,
1400          final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException {
1401        return stub.isMasterInMaintenanceMode(controller, request);
1402      }
1403
1404      @Override
1405      public MasterProtos.BalanceResponse balance(RpcController controller,
1406          MasterProtos.BalanceRequest request) throws ServiceException {
1407        return stub.balance(controller, request);
1408      }
1409
1410      @Override
1411      public MasterProtos.SetBalancerRunningResponse setBalancerRunning(
1412          RpcController controller, MasterProtos.SetBalancerRunningRequest request)
1413          throws ServiceException {
1414        return stub.setBalancerRunning(controller, request);
1415      }
1416
1417      @Override
1418      public NormalizeResponse normalize(RpcController controller,
1419          NormalizeRequest request) throws ServiceException {
1420        return stub.normalize(controller, request);
1421      }
1422
1423      @Override
1424      public SetNormalizerRunningResponse setNormalizerRunning(
1425          RpcController controller, SetNormalizerRunningRequest request)
1426          throws ServiceException {
1427        return stub.setNormalizerRunning(controller, request);
1428      }
1429
1430      @Override
1431      public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller,
1432          MasterProtos.RunCatalogScanRequest request) throws ServiceException {
1433        return stub.runCatalogScan(controller, request);
1434      }
1435
1436      @Override
1437      public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor(
1438          RpcController controller, MasterProtos.EnableCatalogJanitorRequest request)
1439          throws ServiceException {
1440        return stub.enableCatalogJanitor(controller, request);
1441      }
1442
1443      @Override
1444      public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1445          RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request)
1446          throws ServiceException {
1447        return stub.isCatalogJanitorEnabled(controller, request);
1448      }
1449
1450      @Override
1451      public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller,
1452          MasterProtos.RunCleanerChoreRequest request)
1453          throws ServiceException {
1454        return stub.runCleanerChore(controller, request);
1455      }
1456
1457      @Override
1458      public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning(
1459          RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request)
1460          throws ServiceException {
1461        return stub.setCleanerChoreRunning(controller, request);
1462      }
1463
1464      @Override
1465      public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled(
1466          RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request)
1467          throws ServiceException {
1468        return stub.isCleanerChoreEnabled(controller, request);
1469      }
1470
1471      @Override
1472      public ClientProtos.CoprocessorServiceResponse execMasterService(
1473          RpcController controller, ClientProtos.CoprocessorServiceRequest request)
1474          throws ServiceException {
1475        return stub.execMasterService(controller, request);
1476      }
1477
1478      @Override
1479      public MasterProtos.SnapshotResponse snapshot(RpcController controller,
1480          MasterProtos.SnapshotRequest request) throws ServiceException {
1481        return stub.snapshot(controller, request);
1482      }
1483
1484      @Override
1485      public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots(
1486          RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request)
1487          throws ServiceException {
1488        return stub.getCompletedSnapshots(controller, request);
1489      }
1490
1491      @Override
1492      public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1493          MasterProtos.DeleteSnapshotRequest request) throws ServiceException {
1494        return stub.deleteSnapshot(controller, request);
1495      }
1496
1497      @Override
1498      public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1499          MasterProtos.IsSnapshotDoneRequest request) throws ServiceException {
1500        return stub.isSnapshotDone(controller, request);
1501      }
1502
1503      @Override
1504      public MasterProtos.RestoreSnapshotResponse restoreSnapshot(
1505          RpcController controller, MasterProtos.RestoreSnapshotRequest request)
1506          throws ServiceException {
1507        return stub.restoreSnapshot(controller, request);
1508      }
1509
1510      @Override
1511      public MasterProtos.ExecProcedureResponse execProcedure(
1512          RpcController controller, MasterProtos.ExecProcedureRequest request)
1513          throws ServiceException {
1514        return stub.execProcedure(controller, request);
1515      }
1516
1517      @Override
1518      public MasterProtos.ExecProcedureResponse execProcedureWithRet(
1519          RpcController controller, MasterProtos.ExecProcedureRequest request)
1520          throws ServiceException {
1521        return stub.execProcedureWithRet(controller, request);
1522      }
1523
1524      @Override
1525      public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller,
1526          MasterProtos.IsProcedureDoneRequest request) throws ServiceException {
1527        return stub.isProcedureDone(controller, request);
1528      }
1529
1530      @Override
1531      public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
1532          MasterProtos.GetProcedureResultRequest request) throws ServiceException {
1533        return stub.getProcedureResult(controller, request);
1534      }
1535
1536      @Override
1537      public MasterProtos.IsMasterRunningResponse isMasterRunning(
1538          RpcController controller, MasterProtos.IsMasterRunningRequest request)
1539          throws ServiceException {
1540        return stub.isMasterRunning(controller, request);
1541      }
1542
1543      @Override
1544      public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller,
1545          MasterProtos.ModifyNamespaceRequest request)
1546      throws ServiceException {
1547        return stub.modifyNamespace(controller, request);
1548      }
1549
1550      @Override
1551      public MasterProtos.CreateNamespaceResponse createNamespace(
1552          RpcController controller,
1553          MasterProtos.CreateNamespaceRequest request) throws ServiceException {
1554        return stub.createNamespace(controller, request);
1555      }
1556
1557      @Override
1558      public MasterProtos.DeleteNamespaceResponse deleteNamespace(
1559          RpcController controller,
1560          MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
1561        return stub.deleteNamespace(controller, request);
1562      }
1563
1564      @Override
1565      public MasterProtos.ListNamespacesResponse listNamespaces(
1566          RpcController controller,
1567          MasterProtos.ListNamespacesRequest request) throws ServiceException {
1568        return stub.listNamespaces(controller, request);
1569      }
1570
1571      @Override
1572      public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(
1573          RpcController controller,
1574          MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException {
1575        return stub.getNamespaceDescriptor(controller, request);
1576      }
1577
1578      @Override
1579      public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(
1580          RpcController controller,
1581          MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException {
1582        return stub.listNamespaceDescriptors(controller, request);
1583      }
1584
1585      @Override
1586      public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1587          RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request)
1588              throws ServiceException {
1589        return stub.listTableDescriptorsByNamespace(controller, request);
1590      }
1591
1592      @Override
1593      public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1594          RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request)
1595              throws ServiceException {
1596        return stub.listTableNamesByNamespace(controller, request);
1597      }
1598
1599      @Override
1600      public MasterProtos.GetTableStateResponse getTableState(
1601              RpcController controller, MasterProtos.GetTableStateRequest request)
1602              throws ServiceException {
1603        return stub.getTableState(controller, request);
1604      }
1605
1606      @Override
1607      public void close() {
1608        release(this.mss);
1609      }
1610
1611      @Override
1612      public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus(
1613          RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request)
1614          throws ServiceException {
1615        return stub.getSchemaAlterStatus(controller, request);
1616      }
1617
1618      @Override
1619      public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(
1620          RpcController controller, MasterProtos.GetTableDescriptorsRequest request)
1621          throws ServiceException {
1622        return stub.getTableDescriptors(controller, request);
1623      }
1624
1625      @Override
1626      public MasterProtos.GetTableNamesResponse getTableNames(
1627          RpcController controller, MasterProtos.GetTableNamesRequest request)
1628          throws ServiceException {
1629        return stub.getTableNames(controller, request);
1630      }
1631
1632      @Override
1633      public MasterProtos.GetClusterStatusResponse getClusterStatus(
1634          RpcController controller, MasterProtos.GetClusterStatusRequest request)
1635          throws ServiceException {
1636        return stub.getClusterStatus(controller, request);
1637      }
1638
1639      @Override
1640      public MasterProtos.SetQuotaResponse setQuota(
1641          RpcController controller, MasterProtos.SetQuotaRequest request)
1642          throws ServiceException {
1643        return stub.setQuota(controller, request);
1644      }
1645
1646      @Override
1647      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
1648          RpcController controller, MasterProtos.MajorCompactionTimestampRequest request)
1649          throws ServiceException {
1650        return stub.getLastMajorCompactionTimestamp(controller, request);
1651      }
1652
1653      @Override
1654      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
1655          RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request)
1656          throws ServiceException {
1657        return stub.getLastMajorCompactionTimestampForRegion(controller, request);
1658      }
1659
1660      @Override
1661      public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
1662          IsBalancerEnabledRequest request) throws ServiceException {
1663        return stub.isBalancerEnabled(controller, request);
1664      }
1665
1666      @Override
1667      public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
1668        RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request)
1669        throws ServiceException {
1670        return stub.setSplitOrMergeEnabled(controller, request);
1671      }
1672
1673      @Override
1674      public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
1675        RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request)
1676              throws ServiceException {
1677        return stub.isSplitOrMergeEnabled(controller, request);
1678      }
1679
1680      @Override
1681      public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
1682          IsNormalizerEnabledRequest request) throws ServiceException {
1683        return stub.isNormalizerEnabled(controller, request);
1684      }
1685
1686      @Override
1687      public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
1688          SecurityCapabilitiesRequest request) throws ServiceException {
1689        return stub.getSecurityCapabilities(controller, request);
1690      }
1691
1692      @Override
1693      public AddReplicationPeerResponse addReplicationPeer(RpcController controller,
1694          AddReplicationPeerRequest request) throws ServiceException {
1695        return stub.addReplicationPeer(controller, request);
1696      }
1697
1698      @Override
1699      public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
1700          RemoveReplicationPeerRequest request) throws ServiceException {
1701        return stub.removeReplicationPeer(controller, request);
1702      }
1703
1704      @Override
1705      public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
1706          EnableReplicationPeerRequest request) throws ServiceException {
1707        return stub.enableReplicationPeer(controller, request);
1708      }
1709
1710      @Override
1711      public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
1712          DisableReplicationPeerRequest request) throws ServiceException {
1713        return stub.disableReplicationPeer(controller, request);
1714      }
1715
1716      @Override
1717      public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
1718          ListDecommissionedRegionServersRequest request) throws ServiceException {
1719        return stub.listDecommissionedRegionServers(controller, request);
1720      }
1721
1722      @Override
1723      public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
1724          DecommissionRegionServersRequest request) throws ServiceException {
1725        return stub.decommissionRegionServers(controller, request);
1726      }
1727
1728      @Override
1729      public RecommissionRegionServerResponse recommissionRegionServer(
1730          RpcController controller, RecommissionRegionServerRequest request)
1731          throws ServiceException {
1732        return stub.recommissionRegionServer(controller, request);
1733      }
1734
1735      @Override
1736      public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
1737          GetReplicationPeerConfigRequest request) throws ServiceException {
1738        return stub.getReplicationPeerConfig(controller, request);
1739      }
1740
1741      @Override
1742      public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig(
1743          RpcController controller, UpdateReplicationPeerConfigRequest request)
1744          throws ServiceException {
1745        return stub.updateReplicationPeerConfig(controller, request);
1746      }
1747
1748      @Override
1749      public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
1750          ListReplicationPeersRequest request) throws ServiceException {
1751        return stub.listReplicationPeers(controller, request);
1752      }
1753
1754      @Override
1755      public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(
1756          RpcController controller, GetSpaceQuotaRegionSizesRequest request)
1757          throws ServiceException {
1758        return stub.getSpaceQuotaRegionSizes(controller, request);
1759      }
1760
1761      @Override
1762      public GetQuotaStatesResponse getQuotaStates(
1763          RpcController controller, GetQuotaStatesRequest request) throws ServiceException {
1764        return stub.getQuotaStates(controller, request);
1765      }
1766
1767      @Override
1768      public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller,
1769          MasterProtos.ClearDeadServersRequest request) throws ServiceException {
1770        return stub.clearDeadServers(controller, request);
1771      }
1772
1773      @Override
1774      public TransitReplicationPeerSyncReplicationStateResponse
1775        transitReplicationPeerSyncReplicationState(RpcController controller,
1776          TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException {
1777        return stub.transitReplicationPeerSyncReplicationState(controller, request);
1778      }
1779
1780      @Override
1781      public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
1782          SwitchRpcThrottleRequest request) throws ServiceException {
1783        return stub.switchRpcThrottle(controller, request);
1784      }
1785
1786      @Override
1787      public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
1788          IsRpcThrottleEnabledRequest request) throws ServiceException {
1789        return stub.isRpcThrottleEnabled(controller, request);
1790      }
1791
1792      @Override
1793      public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller,
1794          SwitchExceedThrottleQuotaRequest request) throws ServiceException {
1795        return stub.switchExceedThrottleQuota(controller, request);
1796      }
1797
1798      @Override
1799      public AccessControlProtos.GrantResponse grant(RpcController controller,
1800          AccessControlProtos.GrantRequest request) throws ServiceException {
1801        return stub.grant(controller, request);
1802      }
1803
1804      @Override
1805      public AccessControlProtos.RevokeResponse revoke(RpcController controller,
1806          AccessControlProtos.RevokeRequest request) throws ServiceException {
1807        return stub.revoke(controller, request);
1808      }
1809
1810      @Override
1811      public GetUserPermissionsResponse getUserPermissions(RpcController controller,
1812          GetUserPermissionsRequest request) throws ServiceException {
1813        return stub.getUserPermissions(controller, request);
1814      }
1815
1816      @Override
1817      public HasUserPermissionsResponse hasUserPermissions(RpcController controller,
1818          HasUserPermissionsRequest request) throws ServiceException {
1819        return stub.hasUserPermissions(controller, request);
1820      }
1821    };
1822  }
1823
1824  private static void release(MasterServiceState mss) {
1825    if (mss != null && mss.connection != null) {
1826      ((ConnectionImplementation)mss.connection).releaseMaster(mss);
1827    }
1828  }
1829
1830  private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
1831    if (mss.getStub() == null){
1832      return false;
1833    }
1834    try {
1835      return mss.isMasterRunning();
1836    } catch (UndeclaredThrowableException e) {
1837      // It's somehow messy, but we can receive exceptions such as
1838      //  java.net.ConnectException but they're not declared. So we catch it...
1839      LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
1840      return false;
1841    } catch (IOException se) {
1842      LOG.warn("Checking master connection", se);
1843      return false;
1844    }
1845  }
1846
1847  void releaseMaster(MasterServiceState mss) {
1848    if (mss.getStub() == null) {
1849      return;
1850    }
1851    synchronized (masterLock) {
1852      --mss.userCount;
1853    }
1854  }
1855
1856  private void closeMasterService(MasterServiceState mss) {
1857    if (mss.getStub() != null) {
1858      LOG.info("Closing master protocol: " + mss);
1859      mss.clearStub();
1860    }
1861    mss.userCount = 0;
1862  }
1863
1864  /**
1865   * Immediate close of the shared master. Can be by the delayed close or when closing the
1866   * connection itself.
1867   */
1868  private void closeMaster() {
1869    synchronized (masterLock) {
1870      closeMasterService(masterServiceState);
1871    }
1872  }
1873
1874  void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) {
1875    HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
1876    cacheLocation(hri.getTable(), source, newHrl);
1877  }
1878
1879  @Override
1880  public void deleteCachedRegionLocation(final HRegionLocation location) {
1881    metaCache.clearCache(location);
1882  }
1883
1884  /**
1885   * Update the location with the new value (if the exception is a RegionMovedException)
1886   * or delete it from the cache. Does nothing if we can be sure from the exception that
1887   * the location is still accurate, or if the cache has already been updated.
1888   * @param exception an object (to simplify user code) on which we will try to find a nested
1889   *   or wrapped or both RegionMovedException
1890   * @param source server that is the source of the location update.
1891   */
1892  @Override
1893  public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
1894    final Object exception, final ServerName source) {
1895    if (rowkey == null || tableName == null) {
1896      LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
1897          ", tableName=" + (tableName == null ? "null" : tableName));
1898      return;
1899    }
1900
1901    if (source == null) {
1902      // This should not happen, but let's secure ourselves.
1903      return;
1904    }
1905
1906    if (regionName == null) {
1907      // we do not know which region, so just remove the cache entry for the row and server
1908      if (metrics != null) {
1909        metrics.incrCacheDroppingExceptions(exception);
1910      }
1911      metaCache.clearCache(tableName, rowkey, source);
1912      return;
1913    }
1914
1915    // Is it something we have already updated?
1916    final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
1917    HRegionLocation oldLocation = null;
1918    if (oldLocations != null) {
1919      oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
1920    }
1921    if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
1922      // There is no such location in the cache (it's been removed already) or
1923      // the cache has already been refreshed with a different location.  => nothing to do
1924      return;
1925    }
1926
1927    RegionInfo regionInfo = oldLocation.getRegion();
1928    Throwable cause = ClientExceptionsUtil.findException(exception);
1929    if (cause != null) {
1930      if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
1931        // We know that the region is still on this region server
1932        return;
1933      }
1934
1935      if (cause instanceof RegionMovedException) {
1936        RegionMovedException rme = (RegionMovedException) cause;
1937        if (LOG.isTraceEnabled()) {
1938          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
1939              rme.getHostname() + ":" + rme.getPort() +
1940              " according to " + source.getAddress());
1941        }
1942        // We know that the region is not anymore on this region server, but we know
1943        //  the new location.
1944        updateCachedLocation(
1945            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
1946        return;
1947      }
1948    }
1949
1950    if (metrics != null) {
1951      metrics.incrCacheDroppingExceptions(exception);
1952    }
1953
1954    // If we're here, it means that can cannot be sure about the location, so we remove it from
1955    // the cache. Do not send the source because source can be a new server in the same host:port
1956    metaCache.clearCache(regionInfo);
1957  }
1958
1959  @Override
1960  public AsyncProcess getAsyncProcess() {
1961    return asyncProcess;
1962  }
1963
1964  @Override
1965  public ServerStatisticTracker getStatisticsTracker() {
1966    return this.stats;
1967  }
1968
1969  @Override
1970  public ClientBackoffPolicy getBackoffPolicy() {
1971    return this.backoffPolicy;
1972  }
1973
1974  /*
1975   * Return the number of cached region for a table. It will only be called
1976   * from a unit test.
1977   */
1978  @VisibleForTesting
1979  int getNumberOfCachedRegionLocations(final TableName tableName) {
1980    return metaCache.getNumberOfCachedRegionLocations(tableName);
1981  }
1982
1983  @Override
1984  public void abort(final String msg, Throwable t) {
1985    if (t != null) {
1986      LOG.error(HBaseMarkers.FATAL, msg, t);
1987    } else {
1988      LOG.error(HBaseMarkers.FATAL, msg);
1989    }
1990    this.aborted = true;
1991    close();
1992    this.closed = true;
1993  }
1994
1995  @Override
1996  public boolean isClosed() {
1997    return this.closed;
1998  }
1999
2000  @Override
2001  public boolean isAborted(){
2002    return this.aborted;
2003  }
2004
2005  @Override
2006  public int getCurrentNrHRS() throws IOException {
2007    return get(this.registry.getCurrentNrHRS());
2008  }
2009
2010  @Override
2011  public void close() {
2012    if (this.closed) {
2013      return;
2014    }
2015    closeMaster();
2016    shutdownPools();
2017    if (this.metrics != null) {
2018      this.metrics.shutdown();
2019    }
2020    this.closed = true;
2021    registry.close();
2022    this.stubs.clear();
2023    if (clusterStatusListener != null) {
2024      clusterStatusListener.close();
2025    }
2026    if (rpcClient != null) {
2027      rpcClient.close();
2028    }
2029    if (authService != null) {
2030      authService.shutdown();
2031    }
2032  }
2033
2034  /**
2035   * Close the connection for good. On the off chance that someone is unable to close
2036   * the connection, perhaps because it bailed out prematurely, the method
2037   * below will ensure that this instance is cleaned up.
2038   * Caveat: The JVM may take an unknown amount of time to call finalize on an
2039   * unreachable object, so our hope is that every consumer cleans up after
2040   * itself, like any good citizen.
2041   */
2042  @Override
2043  protected void finalize() throws Throwable {
2044    super.finalize();
2045    close();
2046  }
2047
2048  @Override
2049  public NonceGenerator getNonceGenerator() {
2050    return nonceGenerator;
2051  }
2052
2053  @Override
2054  public TableState getTableState(TableName tableName) throws IOException {
2055    checkClosed();
2056    TableState tableState = MetaTableAccessor.getTableState(this, tableName);
2057    if (tableState == null) {
2058      throw new TableNotFoundException(tableName);
2059    }
2060    return tableState;
2061  }
2062
2063  @Override
2064  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2065    return RpcRetryingCallerFactory
2066        .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2067  }
2068
2069  @Override
2070  public boolean hasCellBlockSupport() {
2071    return this.rpcClient.hasCellBlockSupport();
2072  }
2073
2074  @Override
2075  public ConnectionConfiguration getConnectionConfiguration() {
2076    return this.connectionConfig;
2077  }
2078
2079  @Override
2080  public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2081    return this.rpcCallerFactory;
2082  }
2083
2084  @Override
2085  public RpcControllerFactory getRpcControllerFactory() {
2086    return this.rpcControllerFactory;
2087  }
2088
2089  private static <T> T get(CompletableFuture<T> future) throws IOException {
2090    try {
2091      return future.get();
2092    } catch (InterruptedException e) {
2093      Thread.currentThread().interrupt();
2094      throw (IOException) new InterruptedIOException().initCause(e);
2095    } catch (ExecutionException e) {
2096      Throwable cause = e.getCause();
2097      Throwables.propagateIfPossible(cause, IOException.class);
2098      throw new IOException(cause);
2099    }
2100  }
2101}