001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
024import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
025import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx;
027
028import edu.umd.cs.findbugs.annotations.Nullable;
029import java.io.Closeable;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.lang.reflect.UndeclaredThrowableException;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.Date;
036import java.util.List;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.LinkedBlockingQueue;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.locks.ReentrantLock;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.AuthUtil;
050import org.apache.hadoop.hbase.CallQueueTooBigException;
051import org.apache.hadoop.hbase.ChoreService;
052import org.apache.hadoop.hbase.DoNotRetryIOException;
053import org.apache.hadoop.hbase.HConstants;
054import org.apache.hadoop.hbase.HRegionLocation;
055import org.apache.hadoop.hbase.MasterNotRunningException;
056import org.apache.hadoop.hbase.MetaTableAccessor;
057import org.apache.hadoop.hbase.RegionLocations;
058import org.apache.hadoop.hbase.ServerName;
059import org.apache.hadoop.hbase.TableName;
060import org.apache.hadoop.hbase.TableNotEnabledException;
061import org.apache.hadoop.hbase.TableNotFoundException;
062import org.apache.hadoop.hbase.ZooKeeperConnectionException;
063import org.apache.hadoop.hbase.client.Scan.ReadType;
064import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
065import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
066import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
067import org.apache.hadoop.hbase.exceptions.RegionMovedException;
068import org.apache.hadoop.hbase.ipc.RpcClient;
069import org.apache.hadoop.hbase.ipc.RpcClientFactory;
070import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
071import org.apache.hadoop.hbase.log.HBaseMarkers;
072import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
073import org.apache.hadoop.hbase.security.User;
074import org.apache.hadoop.hbase.util.Bytes;
075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
076import org.apache.hadoop.hbase.util.ExceptionUtil;
077import org.apache.hadoop.hbase.util.Pair;
078import org.apache.hadoop.hbase.util.ReflectionUtils;
079import org.apache.hadoop.hbase.util.Threads;
080import org.apache.hadoop.ipc.RemoteException;
081import org.apache.hadoop.security.UserGroupInformation;
082import org.apache.yetus.audience.InterfaceAudience;
083import org.apache.zookeeper.KeeperException;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
088import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
089import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
090import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
091import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
092
093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
094import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
144
145/**
146 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
147 * Encapsulates connection to zookeeper and regionservers.
148 */
149@edu.umd.cs.findbugs.annotations.SuppressWarnings(
150    value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
151    justification="Access to the conncurrent hash map is under a lock so should be fine.")
152@InterfaceAudience.Private
153class ConnectionImplementation implements ClusterConnection, Closeable {
154  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
155  private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
156
157  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
158
159  private final boolean hostnamesCanChange;
160  private final long pause;
161  private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
162  private boolean useMetaReplicas;
163  private final int metaReplicaCallTimeoutScanInMicroSecond;
164  private final int numTries;
165  final int rpcTimeout;
166
167  /**
168   * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
169   * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
170   */
171  private static volatile NonceGenerator nonceGenerator = null;
172  /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */
173  private static final Object nonceGeneratorCreateLock = new Object();
174
175  private final AsyncProcess asyncProcess;
176  // single tracker per connection
177  private final ServerStatisticTracker stats;
178
179  private volatile boolean closed;
180  private volatile boolean aborted;
181
182  // package protected for the tests
183  ClusterStatusListener clusterStatusListener;
184
185  private final Object metaRegionLock = new Object();
186
187  private final Object masterLock = new Object();
188
189  // thread executor shared by all Table instances created
190  // by this connection
191  private volatile ThreadPoolExecutor batchPool = null;
192  // meta thread executor shared by all Table instances created
193  // by this connection
194  private volatile ThreadPoolExecutor metaLookupPool = null;
195  private volatile boolean cleanupPool = false;
196
197  private final Configuration conf;
198
199  // cache the configuration value for tables so that we can avoid calling
200  // the expensive Configuration to fetch the value multiple times.
201  private final ConnectionConfiguration connectionConfig;
202
203  // Client rpc instance.
204  private final RpcClient rpcClient;
205
206  private final MetaCache metaCache;
207  private final MetricsConnection metrics;
208
209  protected User user;
210
211  private final RpcRetryingCallerFactory rpcCallerFactory;
212
213  private final RpcControllerFactory rpcControllerFactory;
214
215  private final RetryingCallerInterceptor interceptor;
216
217  /**
218   * Cluster registry of basic info such as clusterid and meta region location.
219   */
220  private final AsyncRegistry registry;
221
222  private final ClientBackoffPolicy backoffPolicy;
223
224  /**
225   * Allow setting an alternate BufferedMutator implementation via
226   * config. If null, use default.
227   */
228  private final String alternateBufferedMutatorClassName;
229
230  /** lock guards against multiple threads trying to query the meta region at the same time */
231  private final ReentrantLock userRegionLock = new ReentrantLock();
232
233  private ChoreService authService;
234
235  /**
236   * constructor
237   * @param conf Configuration object
238   */
239  ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
240    this.conf = conf;
241    this.user = user;
242    if (user != null && user.isLoginFromKeytab()) {
243      spawnRenewalChore(user.getUGI());
244    }
245    this.batchPool = (ThreadPoolExecutor) pool;
246    this.connectionConfig = new ConnectionConfiguration(conf);
247    this.closed = false;
248    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
249        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
250    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
251    if (configuredPauseForCQTBE < pause) {
252      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
253          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
254          + ", will use " + pause + " instead.");
255      this.pauseForCQTBE = pause;
256    } else {
257      this.pauseForCQTBE = configuredPauseForCQTBE;
258    }
259    this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
260      HConstants.DEFAULT_USE_META_REPLICAS);
261    this.metaReplicaCallTimeoutScanInMicroSecond =
262        connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
263
264    // how many times to try, one more than max *retry* time
265    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
266    this.rpcTimeout = conf.getInt(
267        HConstants.HBASE_RPC_TIMEOUT_KEY,
268        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
269    if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
270      synchronized (nonceGeneratorCreateLock) {
271        if (nonceGenerator == null) {
272          nonceGenerator = PerClientRandomNonceGenerator.get();
273        }
274      }
275    } else {
276      nonceGenerator = NO_NONCE_GENERATOR;
277    }
278
279    this.stats = ServerStatisticTracker.create(conf);
280    this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
281    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
282    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
283    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
284    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
285    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
286      this.metrics =
287        new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool);
288    } else {
289      this.metrics = null;
290    }
291    this.metaCache = new MetaCache(this.metrics);
292
293    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
294        HConstants.STATUS_PUBLISHED_DEFAULT);
295    this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
296    Class<? extends ClusterStatusListener.Listener> listenerClass =
297        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
298            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
299            ClusterStatusListener.Listener.class);
300
301    // Is there an alternate BufferedMutator to use?
302    this.alternateBufferedMutatorClassName =
303        this.conf.get(BufferedMutator.CLASSNAME_KEY);
304
305    try {
306      this.registry = AsyncRegistryFactory.getRegistry(conf);
307      retrieveClusterId();
308
309      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
310
311      // Do we publish the status?
312      if (shouldListen) {
313        if (listenerClass == null) {
314          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
315              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
316        } else {
317          clusterStatusListener = new ClusterStatusListener(
318              new ClusterStatusListener.DeadServerHandler() {
319                @Override
320                public void newDead(ServerName sn) {
321                  clearCaches(sn);
322                  rpcClient.cancelConnections(sn);
323                }
324              }, conf, listenerClass);
325        }
326      }
327    } catch (Throwable e) {
328      // avoid leaks: registry, rpcClient, ...
329      LOG.debug("connection construction failed", e);
330      close();
331      throw e;
332    }
333  }
334
335  private void spawnRenewalChore(final UserGroupInformation user) {
336    authService = new ChoreService("Relogin service");
337    authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
338  }
339
340  /**
341   * @param useMetaReplicas
342   */
343  @VisibleForTesting
344  void setUseMetaReplicas(final boolean useMetaReplicas) {
345    this.useMetaReplicas = useMetaReplicas;
346  }
347
348  /**
349   * @param conn The connection for which to replace the generator.
350   * @param cnm Replaces the nonce generator used, for testing.
351   * @return old nonce generator.
352   */
353  @VisibleForTesting
354  static NonceGenerator injectNonceGeneratorForTesting(
355      ClusterConnection conn, NonceGenerator cnm) {
356    ConnectionImplementation connImpl = (ConnectionImplementation)conn;
357    NonceGenerator ng = connImpl.getNonceGenerator();
358    LOG.warn("Nonce generator is being replaced by test code for "
359      + cnm.getClass().getName());
360    nonceGenerator = cnm;
361    return ng;
362  }
363
364  @Override
365  public Table getTable(TableName tableName) throws IOException {
366    return getTable(tableName, getBatchPool());
367  }
368
369  @Override
370  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
371    return new TableBuilderBase(tableName, connectionConfig) {
372
373      @Override
374      public Table build() {
375        return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
376            rpcControllerFactory, pool);
377      }
378    };
379  }
380
381  @Override
382  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
383    if (params.getTableName() == null) {
384      throw new IllegalArgumentException("TableName cannot be null.");
385    }
386    if (params.getPool() == null) {
387      params.pool(HTable.getDefaultExecutor(getConfiguration()));
388    }
389    if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
390      params.writeBufferSize(connectionConfig.getWriteBufferSize());
391    }
392    if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
393      params.setWriteBufferPeriodicFlushTimeoutMs(
394              connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
395    }
396    if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
397      params.setWriteBufferPeriodicFlushTimerTickMs(
398              connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
399    }
400    if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
401      params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
402    }
403    // Look to see if an alternate BufferedMutation implementation is wanted.
404    // Look in params and in config. If null, use default.
405    String implementationClassName = params.getImplementationClassName();
406    if (implementationClassName == null) {
407      implementationClassName = this.alternateBufferedMutatorClassName;
408    }
409    if (implementationClassName == null) {
410      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
411    }
412    try {
413      return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
414          this, rpcCallerFactory, rpcControllerFactory, params);
415    } catch (ClassNotFoundException e) {
416      throw new RuntimeException(e);
417    }
418  }
419
420  @Override
421  public BufferedMutator getBufferedMutator(TableName tableName) {
422    return getBufferedMutator(new BufferedMutatorParams(tableName));
423  }
424
425  @Override
426  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
427    return new HRegionLocator(tableName, this);
428  }
429
430  @Override
431  public Admin getAdmin() throws IOException {
432    return new HBaseAdmin(this);
433  }
434
435  @Override
436  public Hbck getHbck() throws IOException {
437    return getHbck(get(registry.getMasterAddress()));
438  }
439
440  @Override
441  public Hbck getHbck(ServerName masterServer) throws IOException {
442    checkClosed();
443    if (isDeadServer(masterServer)) {
444      throw new RegionServerStoppedException(masterServer + " is dead.");
445    }
446    String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
447      masterServer, this.hostnamesCanChange);
448
449    return new HBaseHbck(
450      (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
451        BlockingRpcChannel channel =
452          this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
453        return MasterProtos.HbckService.newBlockingStub(channel);
454      }), rpcControllerFactory);
455  }
456
457  @Override
458  public MetricsConnection getConnectionMetrics() {
459    return this.metrics;
460  }
461
462  private ThreadPoolExecutor getBatchPool() {
463    if (batchPool == null) {
464      synchronized (this) {
465        if (batchPool == null) {
466          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
467          this.batchPool = getThreadPool(threads, threads, "-shared", null);
468          this.cleanupPool = true;
469        }
470      }
471    }
472    return this.batchPool;
473  }
474
475  private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint,
476      BlockingQueue<Runnable> passedWorkQueue) {
477    // shared HTable thread executor not yet initialized
478    if (maxThreads == 0) {
479      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
480    }
481    if (coreThreads == 0) {
482      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
483    }
484    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
485    BlockingQueue<Runnable> workQueue = passedWorkQueue;
486    if (workQueue == null) {
487      workQueue =
488        new LinkedBlockingQueue<>(maxThreads *
489            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
490                HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
491      coreThreads = maxThreads;
492    }
493    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
494        coreThreads,
495        maxThreads,
496        keepAliveTime,
497        TimeUnit.SECONDS,
498        workQueue,
499        Threads.newDaemonThreadFactory(toString() + nameHint));
500    tpe.allowCoreThreadTimeOut(true);
501    return tpe;
502  }
503
504  private ThreadPoolExecutor getMetaLookupPool() {
505    if (this.metaLookupPool == null) {
506      synchronized (this) {
507        if (this.metaLookupPool == null) {
508          //Some of the threads would be used for meta replicas
509          //To start with, threads.max.core threads can hit the meta (including replicas).
510          //After that, requests will get queued up in the passed queue, and only after
511          //the queue is full, a new thread will be started
512          int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
513          this.metaLookupPool = getThreadPool(
514             threads,
515             threads,
516             "-metaLookup-shared-", new LinkedBlockingQueue<>());
517        }
518      }
519    }
520    return this.metaLookupPool;
521  }
522
523  protected ExecutorService getCurrentMetaLookupPool() {
524    return metaLookupPool;
525  }
526
527  protected ExecutorService getCurrentBatchPool() {
528    return batchPool;
529  }
530
531  private void shutdownPools() {
532    if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
533      shutdownBatchPool(this.batchPool);
534    }
535    if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
536      shutdownBatchPool(this.metaLookupPool);
537    }
538  }
539
540  private void shutdownBatchPool(ExecutorService pool) {
541    pool.shutdown();
542    try {
543      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
544        pool.shutdownNow();
545      }
546    } catch (InterruptedException e) {
547      pool.shutdownNow();
548    }
549  }
550
551  /**
552   * For tests only.
553   */
554  @VisibleForTesting
555  RpcClient getRpcClient() {
556    return rpcClient;
557  }
558
559  /**
560   * An identifier that will remain the same for a given connection.
561   */
562  @Override
563  public String toString(){
564    return "hconnection-0x" + Integer.toHexString(hashCode());
565  }
566
567  protected String clusterId = null;
568
569  protected void retrieveClusterId() {
570    if (clusterId != null) {
571      return;
572    }
573    try {
574      this.clusterId = this.registry.getClusterId().get();
575    } catch (InterruptedException | ExecutionException e) {
576      LOG.warn("Retrieve cluster id failed", e);
577    }
578    if (clusterId == null) {
579      clusterId = HConstants.CLUSTER_ID_DEFAULT;
580      LOG.debug("clusterid came back null, using default " + clusterId);
581    }
582  }
583
584  @Override
585  public Configuration getConfiguration() {
586    return this.conf;
587  }
588
589  private void checkClosed() throws DoNotRetryIOException {
590    if (this.closed) {
591      throw new DoNotRetryIOException(toString() + " closed");
592    }
593  }
594
595  /**
596   * @return true if the master is running, throws an exception otherwise
597   * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
598   * @deprecated this has been deprecated without a replacement
599   */
600  @Deprecated
601  @Override
602  public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
603    // When getting the master connection, we check it's running,
604    // so if there is no exception, it means we've been able to get a
605    // connection on a running master
606    MasterKeepAliveConnection m;
607    try {
608      m = getKeepAliveMasterService();
609    } catch (IOException e) {
610      throw new MasterNotRunningException(e);
611    }
612    m.close();
613    return true;
614  }
615
616  @Override
617  public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
618      boolean reload) throws IOException {
619    return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
620  }
621
622
623  @Override
624  public boolean isTableEnabled(TableName tableName) throws IOException {
625    return getTableState(tableName).inStates(TableState.State.ENABLED);
626  }
627
628  @Override
629  public boolean isTableDisabled(TableName tableName) throws IOException {
630    return getTableState(tableName).inStates(TableState.State.DISABLED);
631  }
632
633  @Override
634  public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
635      throws IOException {
636    checkClosed();
637    try {
638      if (!isTableEnabled(tableName)) {
639        LOG.debug("Table {} not enabled", tableName);
640        return false;
641      }
642      if (TableName.isMetaTableName(tableName)) {
643        // meta table is always available
644        return true;
645      }
646      List<Pair<RegionInfo, ServerName>> locations =
647        MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true);
648
649      int notDeployed = 0;
650      int regionCount = 0;
651      for (Pair<RegionInfo, ServerName> pair : locations) {
652        RegionInfo info = pair.getFirst();
653        if (pair.getSecond() == null) {
654          LOG.debug("Table {} has not deployed region {}", tableName,
655              pair.getFirst().getEncodedName());
656          notDeployed++;
657        } else if (splitKeys != null
658            && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
659          for (byte[] splitKey : splitKeys) {
660            // Just check if the splitkey is available
661            if (Bytes.equals(info.getStartKey(), splitKey)) {
662              regionCount++;
663              break;
664            }
665          }
666        } else {
667          // Always empty start row should be counted
668          regionCount++;
669        }
670      }
671      if (notDeployed > 0) {
672        if (LOG.isDebugEnabled()) {
673          LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed);
674        }
675        return false;
676      } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
677        if (LOG.isDebugEnabled()) {
678          LOG.debug("Table {} expected to have {} regions, but only {} available", tableName,
679              splitKeys.length + 1, regionCount);
680        }
681        return false;
682      } else {
683        LOG.trace("Table {} should be available", tableName);
684        return true;
685      }
686    } catch (TableNotFoundException tnfe) {
687      LOG.warn("Table {} does not exist", tableName);
688      return false;
689    }
690  }
691
692  @Override
693  public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
694    RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
695      RegionInfo.getStartKey(regionName), false, true);
696    return locations == null ? null : locations.getRegionLocation();
697  }
698
699  private boolean isDeadServer(ServerName sn) {
700    if (clusterStatusListener == null) {
701      return false;
702    } else {
703      return clusterStatusListener.isDeadServer(sn);
704    }
705  }
706
707  @Override
708  public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
709    return locateRegions(tableName, false, true);
710  }
711
712  @Override
713  public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
714      boolean offlined) throws IOException {
715    List<RegionInfo> regions;
716    if (TableName.isMetaTableName(tableName)) {
717      regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO);
718    } else {
719      regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined);
720    }
721    List<HRegionLocation> locations = new ArrayList<>();
722    for (RegionInfo regionInfo : regions) {
723      if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) {
724        continue;
725      }
726      RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
727      if (list != null) {
728        for (HRegionLocation loc : list.getRegionLocations()) {
729          if (loc != null) {
730            locations.add(loc);
731          }
732        }
733      }
734    }
735    return locations;
736  }
737
738  @Override
739  public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
740      throws IOException {
741    RegionLocations locations = locateRegion(tableName, row, true, true);
742    return locations == null ? null : locations.getRegionLocation();
743  }
744
745  @Override
746  public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
747      throws IOException {
748    RegionLocations locations =
749      relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
750    return locations == null ? null
751      : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
752  }
753
754  @Override
755  public RegionLocations relocateRegion(final TableName tableName,
756      final byte [] row, int replicaId) throws IOException{
757    // Since this is an explicit request not to use any caching, finding
758    // disabled tables should not be desirable.  This will ensure that an exception is thrown when
759    // the first time a disabled table is interacted with.
760    if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
761      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
762    }
763
764    return locateRegion(tableName, row, false, true, replicaId);
765  }
766
767  @Override
768  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
769      boolean retry) throws IOException {
770    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
771  }
772
773  @Override
774  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
775      boolean retry, int replicaId) throws IOException {
776    checkClosed();
777    if (tableName == null || tableName.getName().length == 0) {
778      throw new IllegalArgumentException("table name cannot be null or zero length");
779    }
780    if (tableName.equals(TableName.META_TABLE_NAME)) {
781      return locateMeta(tableName, useCache, replicaId);
782    } else {
783      // Region not in the cache - have to go to the meta RS
784      return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
785    }
786  }
787
788  private RegionLocations locateMeta(final TableName tableName,
789      boolean useCache, int replicaId) throws IOException {
790    // HBASE-10785: We cache the location of the META itself, so that we are not overloading
791    // zookeeper with one request for every region lookup. We cache the META with empty row
792    // key in MetaCache.
793    byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
794    RegionLocations locations = null;
795    if (useCache) {
796      locations = getCachedLocation(tableName, metaCacheKey);
797      if (locations != null && locations.getRegionLocation(replicaId) != null) {
798        return locations;
799      }
800    }
801
802    // only one thread should do the lookup.
803    synchronized (metaRegionLock) {
804      // Check the cache again for a hit in case some other thread made the
805      // same query while we were waiting on the lock.
806      if (useCache) {
807        locations = getCachedLocation(tableName, metaCacheKey);
808        if (locations != null && locations.getRegionLocation(replicaId) != null) {
809          return locations;
810        }
811      }
812
813      // Look up from zookeeper
814      locations = get(this.registry.getMetaRegionLocation());
815      if (locations != null) {
816        cacheLocation(tableName, locations);
817      }
818    }
819    return locations;
820  }
821
822  /**
823   * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're
824   * seeking.
825   */
826  private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache,
827      boolean retry, int replicaId) throws IOException {
828    // If we are supposed to be using the cache, look in the cache to see if we already have the
829    // region.
830    if (useCache) {
831      RegionLocations locations = getCachedLocation(tableName, row);
832      if (locations != null && locations.getRegionLocation(replicaId) != null) {
833        return locations;
834      }
835    }
836    // build the key of the meta region we should be looking for.
837    // the extra 9's on the end are necessary to allow "exact" matches
838    // without knowing the precise region names.
839    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
840    byte[] metaStopKey =
841      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
842    Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
843      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
844      .setReadType(ReadType.PREAD);
845    if (this.useMetaReplicas) {
846      s.setConsistency(Consistency.TIMELINE);
847    }
848    int maxAttempts = (retry ? numTries : 1);
849    boolean relocateMeta = false;
850    for (int tries = 0; ; tries++) {
851      if (tries >= maxAttempts) {
852        throw new NoServerForRegionException("Unable to find region for "
853            + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
854      }
855      if (useCache) {
856        RegionLocations locations = getCachedLocation(tableName, row);
857        if (locations != null && locations.getRegionLocation(replicaId) != null) {
858          return locations;
859        }
860      } else {
861        // If we are not supposed to be using the cache, delete any existing cached location
862        // so it won't interfere.
863        // We are only supposed to clean the cache for the specific replicaId
864        metaCache.clearCache(tableName, row, replicaId);
865      }
866      // Query the meta region
867      long pauseBase = this.pause;
868      userRegionLock.lock();
869      try {
870        if (useCache) {// re-check cache after get lock
871          RegionLocations locations = getCachedLocation(tableName, row);
872          if (locations != null && locations.getRegionLocation(replicaId) != null) {
873            return locations;
874          }
875        }
876        if (relocateMeta) {
877          relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
878            RegionInfo.DEFAULT_REPLICA_ID);
879        }
880        s.resetMvccReadPoint();
881        try (ReversedClientScanner rcs =
882          new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
883            rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
884          boolean tableNotFound = true;
885          for (;;) {
886            Result regionInfoRow = rcs.next();
887            if (regionInfoRow == null) {
888              if (tableNotFound) {
889                throw new TableNotFoundException(tableName);
890              } else {
891                throw new IOException(
892                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
893              }
894            }
895            tableNotFound = false;
896            // convert the row result into the HRegionLocation we need!
897            RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
898            if (locations == null || locations.getRegionLocation(replicaId) == null) {
899              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
900            }
901            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
902            if (regionInfo == null) {
903              throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME +
904                ", row=" + regionInfoRow);
905            }
906            // See HBASE-20182. It is possible that we locate to a split parent even after the
907            // children are online, so here we need to skip this region and go to the next one.
908            if (regionInfo.isSplitParent()) {
909              continue;
910            }
911            if (regionInfo.isOffline()) {
912              throw new RegionOfflineException("Region offline; disable table call? " +
913                  regionInfo.getRegionNameAsString());
914            }
915            // It is possible that the split children have not been online yet and we have skipped
916            // the parent in the above condition, so we may have already reached a region which does
917            // not contains us.
918            if (!regionInfo.containsRow(row)) {
919              throw new IOException(
920                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
921            }
922            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
923            if (serverName == null) {
924              throw new NoServerForRegionException("No server address listed in " +
925                TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
926                " containing row " + Bytes.toStringBinary(row));
927            }
928            if (isDeadServer(serverName)) {
929              throw new RegionServerStoppedException(
930                "hbase:meta says the region " + regionInfo.getRegionNameAsString() +
931                  " is managed by the server " + serverName + ", but it is dead.");
932            }
933            // Instantiate the location
934            cacheLocation(tableName, locations);
935            return locations;
936          }
937        }
938      } catch (TableNotFoundException e) {
939        // if we got this error, probably means the table just plain doesn't
940        // exist. rethrow the error immediately. this should always be coming
941        // from the HTable constructor.
942        throw e;
943      } catch (IOException e) {
944        ExceptionUtil.rethrowIfInterrupt(e);
945        if (e instanceof RemoteException) {
946          e = ((RemoteException)e).unwrapRemoteException();
947        }
948        if (e instanceof CallQueueTooBigException) {
949          // Give a special check on CallQueueTooBigException, see #HBASE-17114
950          pauseBase = this.pauseForCQTBE;
951        }
952        if (tries < maxAttempts - 1) {
953          LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " +
954            "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
955        } else {
956          throw e;
957        }
958        // Only relocate the parent region if necessary
959        relocateMeta =
960          !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
961      } finally {
962        userRegionLock.unlock();
963      }
964      try{
965        Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
966      } catch (InterruptedException e) {
967        throw new InterruptedIOException("Giving up trying to location region in " +
968          "meta: thread is interrupted.");
969      }
970    }
971  }
972
973  /**
974   * Put a newly discovered HRegionLocation into the cache.
975   * @param tableName The table name.
976   * @param location the new location
977   */
978  @Override
979  public void cacheLocation(final TableName tableName, final RegionLocations location) {
980    metaCache.cacheLocation(tableName, location);
981  }
982
983  /**
984   * Search the cache for a location that fits our table and row key.
985   * Return null if no suitable region is located.
986   * @return Null or region location found in cache.
987   */
988  RegionLocations getCachedLocation(final TableName tableName,
989      final byte [] row) {
990    return metaCache.getCachedLocation(tableName, row);
991  }
992
993  public void clearRegionCache(final TableName tableName, byte[] row) {
994    metaCache.clearCache(tableName, row);
995  }
996
997  /*
998   * Delete all cached entries of a table that maps to a specific location.
999   */
1000  @Override
1001  public void clearCaches(final ServerName serverName) {
1002    metaCache.clearCache(serverName);
1003  }
1004
1005  @Override
1006  public void clearRegionLocationCache() {
1007    metaCache.clearCache();
1008  }
1009
1010  @Override
1011  public void clearRegionCache(final TableName tableName) {
1012    metaCache.clearCache(tableName);
1013  }
1014
1015  /**
1016   * Put a newly discovered HRegionLocation into the cache.
1017   * @param tableName The table name.
1018   * @param source the source of the new location, if it's not coming from meta
1019   * @param location the new location
1020   */
1021  private void cacheLocation(final TableName tableName, final ServerName source,
1022      final HRegionLocation location) {
1023    metaCache.cacheLocation(tableName, source, location);
1024  }
1025
1026  // Map keyed by service name + regionserver to service stub implementation
1027  private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>();
1028
1029  /**
1030   * State of the MasterService connection/setup.
1031   */
1032  static class MasterServiceState {
1033    Connection connection;
1034
1035    MasterProtos.MasterService.BlockingInterface stub;
1036    int userCount;
1037
1038    MasterServiceState(final Connection connection) {
1039      super();
1040      this.connection = connection;
1041    }
1042
1043    @Override
1044    public String toString() {
1045      return "MasterService";
1046    }
1047
1048    Object getStub() {
1049      return this.stub;
1050    }
1051
1052    void clearStub() {
1053      this.stub = null;
1054    }
1055
1056    boolean isMasterRunning() throws IOException {
1057      MasterProtos.IsMasterRunningResponse response = null;
1058      try {
1059        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1060      } catch (Exception e) {
1061        throw ProtobufUtil.handleRemoteException(e);
1062      }
1063      return response != null? response.getIsMasterRunning(): false;
1064    }
1065  }
1066
1067  /**
1068   * The record of errors for servers.
1069   */
1070  static class ServerErrorTracker {
1071    // We need a concurrent map here, as we could have multiple threads updating it in parallel.
1072    private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>();
1073    private final long canRetryUntil;
1074    private final int maxTries;// max number to try
1075    private final long startTrackingTime;
1076
1077    /**
1078     * Constructor
1079     * @param timeout how long to wait before timeout, in unit of millisecond
1080     * @param maxTries how many times to try
1081     */
1082    public ServerErrorTracker(long timeout, int maxTries) {
1083      this.maxTries = maxTries;
1084      this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
1085      this.startTrackingTime = new Date().getTime();
1086    }
1087
1088    /**
1089     * We stop to retry when we have exhausted BOTH the number of tries and the time allocated.
1090     * @param numAttempt how many times we have tried by now
1091     */
1092    boolean canTryMore(int numAttempt) {
1093      // If there is a single try we must not take into account the time.
1094      return numAttempt < maxTries || (maxTries > 1 &&
1095          EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
1096    }
1097
1098    /**
1099     * Calculates the back-off time for a retrying request to a particular server.
1100     *
1101     * @param server    The server in question.
1102     * @param basePause The default hci pause.
1103     * @return The time to wait before sending next request.
1104     */
1105    long calculateBackoffTime(ServerName server, long basePause) {
1106      long result;
1107      ServerErrors errorStats = errorsByServer.get(server);
1108      if (errorStats != null) {
1109        result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1));
1110      } else {
1111        result = 0; // yes, if the server is not in our list we don't wait before retrying.
1112      }
1113      return result;
1114    }
1115
1116    /**
1117     * Reports that there was an error on the server to do whatever bean-counting necessary.
1118     * @param server The server in question.
1119     */
1120    void reportServerError(ServerName server) {
1121      computeIfAbsent(errorsByServer, server, ServerErrors::new).addError();
1122    }
1123
1124    long getStartTrackingTime() {
1125      return startTrackingTime;
1126    }
1127
1128    /**
1129     * The record of errors for a server.
1130     */
1131    private static class ServerErrors {
1132      private final AtomicInteger retries = new AtomicInteger(0);
1133
1134      public int getCount() {
1135        return retries.get();
1136      }
1137
1138      public void addError() {
1139        retries.incrementAndGet();
1140      }
1141    }
1142  }
1143
1144  /**
1145   * Class to make a MasterServiceStubMaker stub.
1146   */
1147  private final class MasterServiceStubMaker {
1148
1149    private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
1150        throws IOException {
1151      try {
1152        stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1153      } catch (ServiceException e) {
1154        throw ProtobufUtil.handleRemoteException(e);
1155      }
1156    }
1157
1158    /**
1159     * Create a stub. Try once only. It is not typed because there is no common type to protobuf
1160     * services nor their interfaces. Let the caller do appropriate casting.
1161     * @return A stub for master services.
1162     */
1163    private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
1164        throws IOException, KeeperException {
1165      ServerName sn = get(registry.getMasterAddress());
1166      if (sn == null) {
1167        String msg = "ZooKeeper available but no active master location found";
1168        LOG.info(msg);
1169        throw new MasterNotRunningException(msg);
1170      }
1171      if (isDeadServer(sn)) {
1172        throw new MasterNotRunningException(sn + " is dead.");
1173      }
1174      // Use the security info interface name as our stub key
1175      String key =
1176          getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange);
1177      MasterProtos.MasterService.BlockingInterface stub =
1178          (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1179            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1180            return MasterProtos.MasterService.newBlockingStub(channel);
1181          });
1182      isMasterRunning(stub);
1183      return stub;
1184    }
1185
1186    /**
1187     * Create a stub against the master. Retry if necessary.
1188     * @return A stub to do <code>intf</code> against the master
1189     * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
1190     */
1191    MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
1192      // The lock must be at the beginning to prevent multiple master creations
1193      // (and leaks) in a multithread context
1194      synchronized (masterLock) {
1195        Exception exceptionCaught = null;
1196        if (!closed) {
1197          try {
1198            return makeStubNoRetries();
1199          } catch (IOException e) {
1200            exceptionCaught = e;
1201          } catch (KeeperException e) {
1202            exceptionCaught = e;
1203          }
1204          throw new MasterNotRunningException(exceptionCaught);
1205        } else {
1206          throw new DoNotRetryIOException("Connection was closed while trying to get master");
1207        }
1208      }
1209    }
1210  }
1211
1212  @Override
1213  public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
1214    return getAdmin(get(registry.getMasterAddress()));
1215  }
1216
1217  @Override
1218  public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
1219      throws IOException {
1220    checkClosed();
1221    if (isDeadServer(serverName)) {
1222      throw new RegionServerStoppedException(serverName + " is dead.");
1223    }
1224    String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
1225      this.hostnamesCanChange);
1226    return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1227      BlockingRpcChannel channel =
1228          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1229      return AdminProtos.AdminService.newBlockingStub(channel);
1230    });
1231  }
1232
1233  @Override
1234  public BlockingInterface getClient(ServerName serverName) throws IOException {
1235    checkClosed();
1236    if (isDeadServer(serverName)) {
1237      throw new RegionServerStoppedException(serverName + " is dead.");
1238    }
1239    String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(),
1240      serverName, this.hostnamesCanChange);
1241    return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1242      BlockingRpcChannel channel =
1243          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1244      return ClientProtos.ClientService.newBlockingStub(channel);
1245    });
1246  }
1247
1248  final MasterServiceState masterServiceState = new MasterServiceState(this);
1249
1250  @Override
1251  public MasterKeepAliveConnection getMaster() throws IOException {
1252    return getKeepAliveMasterService();
1253  }
1254
1255  private void resetMasterServiceState(final MasterServiceState mss) {
1256    mss.userCount++;
1257  }
1258
1259  private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
1260    synchronized (masterLock) {
1261      if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1262        MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1263        this.masterServiceState.stub = stubMaker.makeStub();
1264      }
1265      resetMasterServiceState(this.masterServiceState);
1266    }
1267    // Ugly delegation just so we can add in a Close method.
1268    final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
1269    return new MasterKeepAliveConnection() {
1270      MasterServiceState mss = masterServiceState;
1271
1272      @Override
1273      public MasterProtos.AbortProcedureResponse abortProcedure(
1274          RpcController controller,
1275          MasterProtos.AbortProcedureRequest request) throws ServiceException {
1276        return stub.abortProcedure(controller, request);
1277      }
1278
1279      @Override
1280      public MasterProtos.GetProceduresResponse getProcedures(
1281          RpcController controller,
1282          MasterProtos.GetProceduresRequest request) throws ServiceException {
1283        return stub.getProcedures(controller, request);
1284      }
1285
1286      @Override
1287      public MasterProtos.GetLocksResponse getLocks(
1288          RpcController controller,
1289          MasterProtos.GetLocksRequest request) throws ServiceException {
1290        return stub.getLocks(controller, request);
1291      }
1292
1293      @Override
1294      public MasterProtos.AddColumnResponse addColumn(
1295          RpcController controller,
1296          MasterProtos.AddColumnRequest request) throws ServiceException {
1297        return stub.addColumn(controller, request);
1298      }
1299
1300      @Override
1301      public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller,
1302          MasterProtos.DeleteColumnRequest request)
1303      throws ServiceException {
1304        return stub.deleteColumn(controller, request);
1305      }
1306
1307      @Override
1308      public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller,
1309          MasterProtos.ModifyColumnRequest request)
1310      throws ServiceException {
1311        return stub.modifyColumn(controller, request);
1312      }
1313
1314      @Override
1315      public MasterProtos.MoveRegionResponse moveRegion(RpcController controller,
1316          MasterProtos.MoveRegionRequest request) throws ServiceException {
1317        return stub.moveRegion(controller, request);
1318      }
1319
1320      @Override
1321      public MasterProtos.MergeTableRegionsResponse mergeTableRegions(
1322          RpcController controller, MasterProtos.MergeTableRegionsRequest request)
1323          throws ServiceException {
1324        return stub.mergeTableRegions(controller, request);
1325      }
1326
1327      @Override
1328      public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
1329          MasterProtos.AssignRegionRequest request) throws ServiceException {
1330        return stub.assignRegion(controller, request);
1331      }
1332
1333      @Override
1334      public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller,
1335          MasterProtos.UnassignRegionRequest request) throws ServiceException {
1336        return stub.unassignRegion(controller, request);
1337      }
1338
1339      @Override
1340      public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller,
1341          MasterProtos.OfflineRegionRequest request) throws ServiceException {
1342        return stub.offlineRegion(controller, request);
1343      }
1344
1345      @Override
1346      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
1347          MasterProtos.SplitTableRegionRequest request) throws ServiceException {
1348        return stub.splitRegion(controller, request);
1349      }
1350
1351      @Override
1352      public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
1353          MasterProtos.DeleteTableRequest request) throws ServiceException {
1354        return stub.deleteTable(controller, request);
1355      }
1356
1357      @Override
1358      public MasterProtos.TruncateTableResponse truncateTable(RpcController controller,
1359          MasterProtos.TruncateTableRequest request) throws ServiceException {
1360        return stub.truncateTable(controller, request);
1361      }
1362
1363      @Override
1364      public MasterProtos.EnableTableResponse enableTable(RpcController controller,
1365          MasterProtos.EnableTableRequest request) throws ServiceException {
1366        return stub.enableTable(controller, request);
1367      }
1368
1369      @Override
1370      public MasterProtos.DisableTableResponse disableTable(RpcController controller,
1371          MasterProtos.DisableTableRequest request) throws ServiceException {
1372        return stub.disableTable(controller, request);
1373      }
1374
1375      @Override
1376      public MasterProtos.ModifyTableResponse modifyTable(RpcController controller,
1377          MasterProtos.ModifyTableRequest request) throws ServiceException {
1378        return stub.modifyTable(controller, request);
1379      }
1380
1381      @Override
1382      public MasterProtos.CreateTableResponse createTable(RpcController controller,
1383          MasterProtos.CreateTableRequest request) throws ServiceException {
1384        return stub.createTable(controller, request);
1385      }
1386
1387      @Override
1388      public MasterProtos.ShutdownResponse shutdown(RpcController controller,
1389          MasterProtos.ShutdownRequest request) throws ServiceException {
1390        return stub.shutdown(controller, request);
1391      }
1392
1393      @Override
1394      public MasterProtos.StopMasterResponse stopMaster(RpcController controller,
1395          MasterProtos.StopMasterRequest request) throws ServiceException {
1396        return stub.stopMaster(controller, request);
1397      }
1398
1399      @Override
1400      public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1401          final RpcController controller,
1402          final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException {
1403        return stub.isMasterInMaintenanceMode(controller, request);
1404      }
1405
1406      @Override
1407      public MasterProtos.BalanceResponse balance(RpcController controller,
1408          MasterProtos.BalanceRequest request) throws ServiceException {
1409        return stub.balance(controller, request);
1410      }
1411
1412      @Override
1413      public MasterProtos.SetBalancerRunningResponse setBalancerRunning(
1414          RpcController controller, MasterProtos.SetBalancerRunningRequest request)
1415          throws ServiceException {
1416        return stub.setBalancerRunning(controller, request);
1417      }
1418
1419      @Override
1420      public NormalizeResponse normalize(RpcController controller,
1421          NormalizeRequest request) throws ServiceException {
1422        return stub.normalize(controller, request);
1423      }
1424
1425      @Override
1426      public SetNormalizerRunningResponse setNormalizerRunning(
1427          RpcController controller, SetNormalizerRunningRequest request)
1428          throws ServiceException {
1429        return stub.setNormalizerRunning(controller, request);
1430      }
1431
1432      @Override
1433      public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller,
1434          MasterProtos.RunCatalogScanRequest request) throws ServiceException {
1435        return stub.runCatalogScan(controller, request);
1436      }
1437
1438      @Override
1439      public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor(
1440          RpcController controller, MasterProtos.EnableCatalogJanitorRequest request)
1441          throws ServiceException {
1442        return stub.enableCatalogJanitor(controller, request);
1443      }
1444
1445      @Override
1446      public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1447          RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request)
1448          throws ServiceException {
1449        return stub.isCatalogJanitorEnabled(controller, request);
1450      }
1451
1452      @Override
1453      public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller,
1454          MasterProtos.RunCleanerChoreRequest request)
1455          throws ServiceException {
1456        return stub.runCleanerChore(controller, request);
1457      }
1458
1459      @Override
1460      public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning(
1461          RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request)
1462          throws ServiceException {
1463        return stub.setCleanerChoreRunning(controller, request);
1464      }
1465
1466      @Override
1467      public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled(
1468          RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request)
1469          throws ServiceException {
1470        return stub.isCleanerChoreEnabled(controller, request);
1471      }
1472
1473      @Override
1474      public ClientProtos.CoprocessorServiceResponse execMasterService(
1475          RpcController controller, ClientProtos.CoprocessorServiceRequest request)
1476          throws ServiceException {
1477        return stub.execMasterService(controller, request);
1478      }
1479
1480      @Override
1481      public MasterProtos.SnapshotResponse snapshot(RpcController controller,
1482          MasterProtos.SnapshotRequest request) throws ServiceException {
1483        return stub.snapshot(controller, request);
1484      }
1485
1486      @Override
1487      public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots(
1488          RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request)
1489          throws ServiceException {
1490        return stub.getCompletedSnapshots(controller, request);
1491      }
1492
1493      @Override
1494      public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1495          MasterProtos.DeleteSnapshotRequest request) throws ServiceException {
1496        return stub.deleteSnapshot(controller, request);
1497      }
1498
1499      @Override
1500      public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1501          MasterProtos.IsSnapshotDoneRequest request) throws ServiceException {
1502        return stub.isSnapshotDone(controller, request);
1503      }
1504
1505      @Override
1506      public MasterProtos.RestoreSnapshotResponse restoreSnapshot(
1507          RpcController controller, MasterProtos.RestoreSnapshotRequest request)
1508          throws ServiceException {
1509        return stub.restoreSnapshot(controller, request);
1510      }
1511
1512      @Override
1513      public MasterProtos.ExecProcedureResponse execProcedure(
1514          RpcController controller, MasterProtos.ExecProcedureRequest request)
1515          throws ServiceException {
1516        return stub.execProcedure(controller, request);
1517      }
1518
1519      @Override
1520      public MasterProtos.ExecProcedureResponse execProcedureWithRet(
1521          RpcController controller, MasterProtos.ExecProcedureRequest request)
1522          throws ServiceException {
1523        return stub.execProcedureWithRet(controller, request);
1524      }
1525
1526      @Override
1527      public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller,
1528          MasterProtos.IsProcedureDoneRequest request) throws ServiceException {
1529        return stub.isProcedureDone(controller, request);
1530      }
1531
1532      @Override
1533      public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
1534          MasterProtos.GetProcedureResultRequest request) throws ServiceException {
1535        return stub.getProcedureResult(controller, request);
1536      }
1537
1538      @Override
1539      public MasterProtos.IsMasterRunningResponse isMasterRunning(
1540          RpcController controller, MasterProtos.IsMasterRunningRequest request)
1541          throws ServiceException {
1542        return stub.isMasterRunning(controller, request);
1543      }
1544
1545      @Override
1546      public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller,
1547          MasterProtos.ModifyNamespaceRequest request)
1548      throws ServiceException {
1549        return stub.modifyNamespace(controller, request);
1550      }
1551
1552      @Override
1553      public MasterProtos.CreateNamespaceResponse createNamespace(
1554          RpcController controller,
1555          MasterProtos.CreateNamespaceRequest request) throws ServiceException {
1556        return stub.createNamespace(controller, request);
1557      }
1558
1559      @Override
1560      public MasterProtos.DeleteNamespaceResponse deleteNamespace(
1561          RpcController controller,
1562          MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
1563        return stub.deleteNamespace(controller, request);
1564      }
1565
1566      @Override
1567      public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(
1568          RpcController controller,
1569          MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException {
1570        return stub.getNamespaceDescriptor(controller, request);
1571      }
1572
1573      @Override
1574      public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(
1575          RpcController controller,
1576          MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException {
1577        return stub.listNamespaceDescriptors(controller, request);
1578      }
1579
1580      @Override
1581      public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1582          RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request)
1583              throws ServiceException {
1584        return stub.listTableDescriptorsByNamespace(controller, request);
1585      }
1586
1587      @Override
1588      public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1589          RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request)
1590              throws ServiceException {
1591        return stub.listTableNamesByNamespace(controller, request);
1592      }
1593
1594      @Override
1595      public MasterProtos.GetTableStateResponse getTableState(
1596              RpcController controller, MasterProtos.GetTableStateRequest request)
1597              throws ServiceException {
1598        return stub.getTableState(controller, request);
1599      }
1600
1601      @Override
1602      public void close() {
1603        release(this.mss);
1604      }
1605
1606      @Override
1607      public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus(
1608          RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request)
1609          throws ServiceException {
1610        return stub.getSchemaAlterStatus(controller, request);
1611      }
1612
1613      @Override
1614      public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(
1615          RpcController controller, MasterProtos.GetTableDescriptorsRequest request)
1616          throws ServiceException {
1617        return stub.getTableDescriptors(controller, request);
1618      }
1619
1620      @Override
1621      public MasterProtos.GetTableNamesResponse getTableNames(
1622          RpcController controller, MasterProtos.GetTableNamesRequest request)
1623          throws ServiceException {
1624        return stub.getTableNames(controller, request);
1625      }
1626
1627      @Override
1628      public MasterProtos.GetClusterStatusResponse getClusterStatus(
1629          RpcController controller, MasterProtos.GetClusterStatusRequest request)
1630          throws ServiceException {
1631        return stub.getClusterStatus(controller, request);
1632      }
1633
1634      @Override
1635      public MasterProtos.SetQuotaResponse setQuota(
1636          RpcController controller, MasterProtos.SetQuotaRequest request)
1637          throws ServiceException {
1638        return stub.setQuota(controller, request);
1639      }
1640
1641      @Override
1642      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
1643          RpcController controller, MasterProtos.MajorCompactionTimestampRequest request)
1644          throws ServiceException {
1645        return stub.getLastMajorCompactionTimestamp(controller, request);
1646      }
1647
1648      @Override
1649      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
1650          RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request)
1651          throws ServiceException {
1652        return stub.getLastMajorCompactionTimestampForRegion(controller, request);
1653      }
1654
1655      @Override
1656      public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
1657          IsBalancerEnabledRequest request) throws ServiceException {
1658        return stub.isBalancerEnabled(controller, request);
1659      }
1660
1661      @Override
1662      public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
1663        RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request)
1664        throws ServiceException {
1665        return stub.setSplitOrMergeEnabled(controller, request);
1666      }
1667
1668      @Override
1669      public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
1670        RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request)
1671              throws ServiceException {
1672        return stub.isSplitOrMergeEnabled(controller, request);
1673      }
1674
1675      @Override
1676      public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
1677          IsNormalizerEnabledRequest request) throws ServiceException {
1678        return stub.isNormalizerEnabled(controller, request);
1679      }
1680
1681      @Override
1682      public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
1683          SecurityCapabilitiesRequest request) throws ServiceException {
1684        return stub.getSecurityCapabilities(controller, request);
1685      }
1686
1687      @Override
1688      public AddReplicationPeerResponse addReplicationPeer(RpcController controller,
1689          AddReplicationPeerRequest request) throws ServiceException {
1690        return stub.addReplicationPeer(controller, request);
1691      }
1692
1693      @Override
1694      public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
1695          RemoveReplicationPeerRequest request) throws ServiceException {
1696        return stub.removeReplicationPeer(controller, request);
1697      }
1698
1699      @Override
1700      public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
1701          EnableReplicationPeerRequest request) throws ServiceException {
1702        return stub.enableReplicationPeer(controller, request);
1703      }
1704
1705      @Override
1706      public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
1707          DisableReplicationPeerRequest request) throws ServiceException {
1708        return stub.disableReplicationPeer(controller, request);
1709      }
1710
1711      @Override
1712      public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
1713          ListDecommissionedRegionServersRequest request) throws ServiceException {
1714        return stub.listDecommissionedRegionServers(controller, request);
1715      }
1716
1717      @Override
1718      public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
1719          DecommissionRegionServersRequest request) throws ServiceException {
1720        return stub.decommissionRegionServers(controller, request);
1721      }
1722
1723      @Override
1724      public RecommissionRegionServerResponse recommissionRegionServer(
1725          RpcController controller, RecommissionRegionServerRequest request)
1726          throws ServiceException {
1727        return stub.recommissionRegionServer(controller, request);
1728      }
1729
1730      @Override
1731      public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
1732          GetReplicationPeerConfigRequest request) throws ServiceException {
1733        return stub.getReplicationPeerConfig(controller, request);
1734      }
1735
1736      @Override
1737      public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig(
1738          RpcController controller, UpdateReplicationPeerConfigRequest request)
1739          throws ServiceException {
1740        return stub.updateReplicationPeerConfig(controller, request);
1741      }
1742
1743      @Override
1744      public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
1745          ListReplicationPeersRequest request) throws ServiceException {
1746        return stub.listReplicationPeers(controller, request);
1747      }
1748
1749      @Override
1750      public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(
1751          RpcController controller, GetSpaceQuotaRegionSizesRequest request)
1752          throws ServiceException {
1753        return stub.getSpaceQuotaRegionSizes(controller, request);
1754      }
1755
1756      @Override
1757      public GetQuotaStatesResponse getQuotaStates(
1758          RpcController controller, GetQuotaStatesRequest request) throws ServiceException {
1759        return stub.getQuotaStates(controller, request);
1760      }
1761
1762      @Override
1763      public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller,
1764          MasterProtos.ClearDeadServersRequest request) throws ServiceException {
1765        return stub.clearDeadServers(controller, request);
1766      }
1767
1768      @Override
1769      public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
1770          SwitchRpcThrottleRequest request) throws ServiceException {
1771        return stub.switchRpcThrottle(controller, request);
1772      }
1773
1774      @Override
1775      public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
1776          IsRpcThrottleEnabledRequest request) throws ServiceException {
1777        return stub.isRpcThrottleEnabled(controller, request);
1778      }
1779
1780      @Override
1781      public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller,
1782          SwitchExceedThrottleQuotaRequest request) throws ServiceException {
1783        return stub.switchExceedThrottleQuota(controller, request);
1784      }
1785
1786      @Override
1787      public AccessControlProtos.GrantResponse grant(RpcController controller,
1788          AccessControlProtos.GrantRequest request) throws ServiceException {
1789        return stub.grant(controller, request);
1790      }
1791
1792      @Override
1793      public AccessControlProtos.RevokeResponse revoke(RpcController controller,
1794          AccessControlProtos.RevokeRequest request) throws ServiceException {
1795        return stub.revoke(controller, request);
1796      }
1797
1798      @Override
1799      public GetUserPermissionsResponse getUserPermissions(RpcController controller,
1800          GetUserPermissionsRequest request) throws ServiceException {
1801        return stub.getUserPermissions(controller, request);
1802      }
1803
1804      @Override
1805      public HasUserPermissionsResponse hasUserPermissions(RpcController controller,
1806          HasUserPermissionsRequest request) throws ServiceException {
1807        return stub.hasUserPermissions(controller, request);
1808      }
1809    };
1810  }
1811
1812  private static void release(MasterServiceState mss) {
1813    if (mss != null && mss.connection != null) {
1814      ((ConnectionImplementation)mss.connection).releaseMaster(mss);
1815    }
1816  }
1817
1818  private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
1819    if (mss.getStub() == null){
1820      return false;
1821    }
1822    try {
1823      return mss.isMasterRunning();
1824    } catch (UndeclaredThrowableException e) {
1825      // It's somehow messy, but we can receive exceptions such as
1826      //  java.net.ConnectException but they're not declared. So we catch it...
1827      LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
1828      return false;
1829    } catch (IOException se) {
1830      LOG.warn("Checking master connection", se);
1831      return false;
1832    }
1833  }
1834
1835  void releaseMaster(MasterServiceState mss) {
1836    if (mss.getStub() == null) {
1837      return;
1838    }
1839    synchronized (masterLock) {
1840      --mss.userCount;
1841    }
1842  }
1843
1844  private void closeMasterService(MasterServiceState mss) {
1845    if (mss.getStub() != null) {
1846      LOG.info("Closing master protocol: " + mss);
1847      mss.clearStub();
1848    }
1849    mss.userCount = 0;
1850  }
1851
1852  /**
1853   * Immediate close of the shared master. Can be by the delayed close or when closing the
1854   * connection itself.
1855   */
1856  private void closeMaster() {
1857    synchronized (masterLock) {
1858      closeMasterService(masterServiceState);
1859    }
1860  }
1861
1862  void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) {
1863    HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
1864    cacheLocation(hri.getTable(), source, newHrl);
1865  }
1866
1867  @Override
1868  public void deleteCachedRegionLocation(final HRegionLocation location) {
1869    metaCache.clearCache(location);
1870  }
1871
1872  /**
1873   * Update the location with the new value (if the exception is a RegionMovedException)
1874   * or delete it from the cache. Does nothing if we can be sure from the exception that
1875   * the location is still accurate, or if the cache has already been updated.
1876   * @param exception an object (to simplify user code) on which we will try to find a nested
1877   *   or wrapped or both RegionMovedException
1878   * @param source server that is the source of the location update.
1879   */
1880  @Override
1881  public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
1882    final Object exception, final ServerName source) {
1883    if (rowkey == null || tableName == null) {
1884      LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
1885          ", tableName=" + (tableName == null ? "null" : tableName));
1886      return;
1887    }
1888
1889    if (source == null) {
1890      // This should not happen, but let's secure ourselves.
1891      return;
1892    }
1893
1894    if (regionName == null) {
1895      // we do not know which region, so just remove the cache entry for the row and server
1896      if (metrics != null) {
1897        metrics.incrCacheDroppingExceptions(exception);
1898      }
1899      metaCache.clearCache(tableName, rowkey, source);
1900      return;
1901    }
1902
1903    // Is it something we have already updated?
1904    final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
1905    HRegionLocation oldLocation = null;
1906    if (oldLocations != null) {
1907      oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
1908    }
1909    if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
1910      // There is no such location in the cache (it's been removed already) or
1911      // the cache has already been refreshed with a different location.  => nothing to do
1912      return;
1913    }
1914
1915    RegionInfo regionInfo = oldLocation.getRegion();
1916    Throwable cause = ClientExceptionsUtil.findException(exception);
1917    if (cause != null) {
1918      if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
1919        // We know that the region is still on this region server
1920        return;
1921      }
1922
1923      if (cause instanceof RegionMovedException) {
1924        RegionMovedException rme = (RegionMovedException) cause;
1925        if (LOG.isTraceEnabled()) {
1926          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
1927              rme.getHostname() + ":" + rme.getPort() +
1928              " according to " + source.getAddress());
1929        }
1930        // We know that the region is not anymore on this region server, but we know
1931        //  the new location.
1932        updateCachedLocation(
1933            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
1934        return;
1935      }
1936    }
1937
1938    if (metrics != null) {
1939      metrics.incrCacheDroppingExceptions(exception);
1940    }
1941
1942    // If we're here, it means that can cannot be sure about the location, so we remove it from
1943    // the cache. Do not send the source because source can be a new server in the same host:port
1944    metaCache.clearCache(regionInfo);
1945  }
1946
1947  @Override
1948  public AsyncProcess getAsyncProcess() {
1949    return asyncProcess;
1950  }
1951
1952  @Override
1953  public ServerStatisticTracker getStatisticsTracker() {
1954    return this.stats;
1955  }
1956
1957  @Override
1958  public ClientBackoffPolicy getBackoffPolicy() {
1959    return this.backoffPolicy;
1960  }
1961
1962  /*
1963   * Return the number of cached region for a table. It will only be called
1964   * from a unit test.
1965   */
1966  @VisibleForTesting
1967  int getNumberOfCachedRegionLocations(final TableName tableName) {
1968    return metaCache.getNumberOfCachedRegionLocations(tableName);
1969  }
1970
1971  @Override
1972  public void abort(final String msg, Throwable t) {
1973    if (t != null) {
1974      LOG.error(HBaseMarkers.FATAL, msg, t);
1975    } else {
1976      LOG.error(HBaseMarkers.FATAL, msg);
1977    }
1978    this.aborted = true;
1979    close();
1980    this.closed = true;
1981  }
1982
1983  @Override
1984  public boolean isClosed() {
1985    return this.closed;
1986  }
1987
1988  @Override
1989  public boolean isAborted(){
1990    return this.aborted;
1991  }
1992
1993  @Override
1994  public void close() {
1995    if (this.closed) {
1996      return;
1997    }
1998    closeMaster();
1999    shutdownPools();
2000    if (this.metrics != null) {
2001      this.metrics.shutdown();
2002    }
2003    this.closed = true;
2004    registry.close();
2005    this.stubs.clear();
2006    if (clusterStatusListener != null) {
2007      clusterStatusListener.close();
2008    }
2009    if (rpcClient != null) {
2010      rpcClient.close();
2011    }
2012    if (authService != null) {
2013      authService.shutdown();
2014    }
2015  }
2016
2017  /**
2018   * Close the connection for good. On the off chance that someone is unable to close
2019   * the connection, perhaps because it bailed out prematurely, the method
2020   * below will ensure that this instance is cleaned up.
2021   * Caveat: The JVM may take an unknown amount of time to call finalize on an
2022   * unreachable object, so our hope is that every consumer cleans up after
2023   * itself, like any good citizen.
2024   */
2025  @Override
2026  protected void finalize() throws Throwable {
2027    super.finalize();
2028    close();
2029  }
2030
2031  @Override
2032  public NonceGenerator getNonceGenerator() {
2033    return nonceGenerator;
2034  }
2035
2036  @Override
2037  public TableState getTableState(TableName tableName) throws IOException {
2038    checkClosed();
2039    TableState tableState = MetaTableAccessor.getTableState(this, tableName);
2040    if (tableState == null) {
2041      throw new TableNotFoundException(tableName);
2042    }
2043    return tableState;
2044  }
2045
2046  @Override
2047  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2048    return RpcRetryingCallerFactory
2049        .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2050  }
2051
2052  @Override
2053  public boolean hasCellBlockSupport() {
2054    return this.rpcClient.hasCellBlockSupport();
2055  }
2056
2057  @Override
2058  public ConnectionConfiguration getConnectionConfiguration() {
2059    return this.connectionConfig;
2060  }
2061
2062  @Override
2063  public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2064    return this.rpcCallerFactory;
2065  }
2066
2067  @Override
2068  public RpcControllerFactory getRpcControllerFactory() {
2069    return this.rpcControllerFactory;
2070  }
2071
2072  private static <T> T get(CompletableFuture<T> future) throws IOException {
2073    try {
2074      return future.get();
2075    } catch (InterruptedException e) {
2076      Thread.currentThread().interrupt();
2077      throw (IOException) new InterruptedIOException().initCause(e);
2078    } catch (ExecutionException e) {
2079      Throwable cause = e.getCause();
2080      Throwables.propagateIfPossible(cause, IOException.class);
2081      throw new IOException(cause);
2082    }
2083  }
2084}