001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
021import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
022import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
027import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
028import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
029import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx;
030
031import edu.umd.cs.findbugs.annotations.Nullable;
032import io.opentelemetry.api.trace.Span;
033import io.opentelemetry.context.Scope;
034import java.io.Closeable;
035import java.io.IOException;
036import java.io.InterruptedIOException;
037import java.lang.reflect.UndeclaredThrowableException;
038import java.util.ArrayList;
039import java.util.Collections;
040import java.util.Date;
041import java.util.List;
042import java.util.concurrent.BlockingQueue;
043import java.util.concurrent.CompletableFuture;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.ConcurrentMap;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.ExecutorService;
048import java.util.concurrent.LinkedBlockingQueue;
049import java.util.concurrent.ThreadPoolExecutor;
050import java.util.concurrent.TimeUnit;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.concurrent.locks.ReentrantLock;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.hbase.AuthUtil;
055import org.apache.hadoop.hbase.CatalogReplicaMode;
056import org.apache.hadoop.hbase.ChoreService;
057import org.apache.hadoop.hbase.DoNotRetryIOException;
058import org.apache.hadoop.hbase.HBaseServerException;
059import org.apache.hadoop.hbase.HConstants;
060import org.apache.hadoop.hbase.HRegionLocation;
061import org.apache.hadoop.hbase.MasterNotRunningException;
062import org.apache.hadoop.hbase.MetaTableAccessor;
063import org.apache.hadoop.hbase.RegionLocations;
064import org.apache.hadoop.hbase.ServerName;
065import org.apache.hadoop.hbase.TableName;
066import org.apache.hadoop.hbase.TableNotEnabledException;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.ZooKeeperConnectionException;
069import org.apache.hadoop.hbase.client.Scan.ReadType;
070import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
071import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
072import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
073import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
074import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
075import org.apache.hadoop.hbase.exceptions.RegionMovedException;
076import org.apache.hadoop.hbase.ipc.RpcClient;
077import org.apache.hadoop.hbase.ipc.RpcClientFactory;
078import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
079import org.apache.hadoop.hbase.log.HBaseMarkers;
080import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
081import org.apache.hadoop.hbase.security.User;
082import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
083import org.apache.hadoop.hbase.trace.TraceUtil;
084import org.apache.hadoop.hbase.util.Bytes;
085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
086import org.apache.hadoop.hbase.util.ExceptionUtil;
087import org.apache.hadoop.hbase.util.Pair;
088import org.apache.hadoop.hbase.util.ReflectionUtils;
089import org.apache.hadoop.hbase.util.Threads;
090import org.apache.hadoop.ipc.RemoteException;
091import org.apache.hadoop.security.UserGroupInformation;
092import org.apache.yetus.audience.InterfaceAudience;
093import org.apache.zookeeper.KeeperException;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
098import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
099import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
100import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
101import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
102
103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
104import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest;
130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse;
131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
143import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
144import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
145import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
146import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
147import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
148import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
149import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
150import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
151import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
152import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
153import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
154import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
155import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
156import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
157import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
158import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
159import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
160import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
161
162/**
163 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. Encapsulates
164 * connection to zookeeper and regionservers.
165 */
166@edu.umd.cs.findbugs.annotations.SuppressWarnings(
167    value = "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
168    justification = "Access to the conncurrent hash map is under a lock so should be fine.")
169@InterfaceAudience.Private
170public class ConnectionImplementation implements ClusterConnection, Closeable {
171  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
172  private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
173
174  // The mode tells if HedgedRead, LoadBalance mode is supported.
175  // The default mode is CatalogReplicaMode.None.
176  private CatalogReplicaMode metaReplicaMode;
177  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
178
179  private final int metaReplicaCallTimeoutScanInMicroSecond;
180  private final int numTries;
181  final int rpcTimeout;
182
183  /**
184   * Global nonceGenerator shared per client. Currently there's no reason to limit its scope. Once
185   * it's set under nonceGeneratorCreateLock, it is never unset or changed.
186   */
187  // XXX: It is a bad pattern to assign a value to a static field from a constructor. However
188  // it would likely change semantics if we change it because the NonceGenerator is selected
189  // from configuration passed in as a parameter of the constructor. This has been cleaned up
190  // in later branches.
191  private static volatile NonceGenerator nonceGenerator = null;
192  /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */
193  private static final Object nonceGeneratorCreateLock = new Object();
194
195  private final AsyncProcess asyncProcess;
196  // single tracker per connection
197  private final ServerStatisticTracker stats;
198
199  private volatile boolean closed;
200  private volatile boolean aborted;
201
202  // package protected for the tests
203  ClusterStatusListener clusterStatusListener;
204
205  private final Object metaRegionLock = new Object();
206
207  private final Object masterLock = new Object();
208
209  // thread executor shared by all Table instances created
210  // by this connection
211  private volatile ThreadPoolExecutor batchPool = null;
212  // meta thread executor shared by all Table instances created
213  // by this connection
214  private volatile ThreadPoolExecutor metaLookupPool = null;
215  private volatile boolean cleanupPool = false;
216
217  private final Configuration conf;
218
219  // cache the configuration value for tables so that we can avoid calling
220  // the expensive Configuration to fetch the value multiple times.
221  private final ConnectionConfiguration connectionConfig;
222
223  // Client rpc instance.
224  private final RpcClient rpcClient;
225
226  private final MetaCache metaCache;
227  private final MetricsConnection metrics;
228
229  protected User user;
230
231  private final RpcRetryingCallerFactory rpcCallerFactory;
232
233  private final RpcControllerFactory rpcControllerFactory;
234
235  private final RetryingCallerInterceptor interceptor;
236
237  /**
238   * Cluster registry of basic info such as clusterid and meta region location.
239   */
240  private final ConnectionRegistry registry;
241
242  private final ClientBackoffPolicy backoffPolicy;
243
244  /**
245   * Allow setting an alternate BufferedMutator implementation via config. If null, use default.
246   */
247  private final String alternateBufferedMutatorClassName;
248
249  /** lock guards against multiple threads trying to query the meta region at the same time */
250  private final ReentrantLock userRegionLock = new ReentrantLock();
251
252  private ChoreService choreService;
253
254  /**
255   * constructor
256   * @param conf Configuration object
257   */
258  ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
259    this(conf, pool, user, null);
260  }
261
262  /**
263   * Constructor, for creating cluster connection with provided ConnectionRegistry.
264   */
265  ConnectionImplementation(Configuration conf, ExecutorService pool, User user,
266    ConnectionRegistry registry) throws IOException {
267    this.conf = conf;
268    this.user = user;
269    if (user != null && user.isLoginFromKeytab()) {
270      spawnRenewalChore(user.getUGI());
271    }
272    this.batchPool = (ThreadPoolExecutor) pool;
273    this.connectionConfig = new ConnectionConfiguration(conf);
274    this.closed = false;
275    this.metaReplicaCallTimeoutScanInMicroSecond =
276      connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
277
278    // how many times to try, one more than max *retry* time
279    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
280    this.rpcTimeout =
281      conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
282    if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
283      synchronized (nonceGeneratorCreateLock) {
284        if (nonceGenerator == null) {
285          nonceGenerator = PerClientRandomNonceGenerator.get();
286        }
287      }
288    } else {
289      nonceGenerator = NO_NONCE_GENERATOR;
290    }
291
292    this.stats = ServerStatisticTracker.create(conf);
293    this.interceptor = new RetryingCallerInterceptorFactory(conf).build();
294    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
295    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
296    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
297    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
298
299    boolean shouldListen =
300      conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
301    Class<? extends ClusterStatusListener.Listener> listenerClass =
302      conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
303        ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
304
305    // Is there an alternate BufferedMutator to use?
306    this.alternateBufferedMutatorClassName = this.conf.get(BufferedMutator.CLASSNAME_KEY);
307
308    try {
309      if (registry == null) {
310        this.registry = ConnectionRegistryFactory.getRegistry(conf);
311      } else {
312        this.registry = registry;
313      }
314      retrieveClusterId();
315
316      if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
317        String scope = MetricsConnection.getScope(conf, clusterId, this);
318        this.metrics = new MetricsConnection(scope, this::getBatchPool, this::getMetaLookupPool);
319      } else {
320        this.metrics = null;
321      }
322      this.metaCache = new MetaCache(this.metrics);
323
324      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
325
326      // Do we publish the status?
327      if (shouldListen) {
328        if (listenerClass == null) {
329          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but "
330            + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
331        } else {
332          clusterStatusListener =
333            new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
334              @Override
335              public void newDead(ServerName sn) {
336                clearCaches(sn);
337                rpcClient.cancelConnections(sn);
338              }
339            }, conf, listenerClass);
340        }
341      }
342    } catch (Throwable e) {
343      // avoid leaks: registry, rpcClient, ...
344      LOG.debug("connection construction failed", e);
345      close();
346      throw e;
347    }
348
349    // Get the region locator's meta replica mode.
350    this.metaReplicaMode = CatalogReplicaMode
351      .fromString(conf.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
352
353    switch (this.metaReplicaMode) {
354      case LOAD_BALANCE:
355        String replicaSelectorClass =
356          conf.get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
357            CatalogReplicaLoadBalanceSimpleSelector.class.getName());
358
359        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory
360          .createSelector(replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> {
361            int numOfReplicas = 1;
362            try {
363              RegionLocations metaLocations = this.registry.getMetaRegionLocations()
364                .get(connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
365              numOfReplicas = metaLocations.size();
366            } catch (Exception e) {
367              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
368            }
369            return numOfReplicas;
370          });
371        break;
372      case NONE:
373        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
374
375        boolean useMetaReplicas = conf.getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
376        if (useMetaReplicas) {
377          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
378        }
379        break;
380      default:
381        // Doing nothing
382    }
383  }
384
385  private void spawnRenewalChore(final UserGroupInformation user) {
386    ChoreService service = getChoreService();
387    service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf));
388  }
389
390  /**
391   * @param conn The connection for which to replace the generator.
392   * @param cnm  Replaces the nonce generator used, for testing.
393   * @return old nonce generator.
394   */
395  static NonceGenerator injectNonceGeneratorForTesting(ClusterConnection conn, NonceGenerator cnm) {
396    ConnectionImplementation connImpl = (ConnectionImplementation) conn;
397    NonceGenerator ng = connImpl.getNonceGenerator();
398    LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName());
399    nonceGenerator = cnm;
400    return ng;
401  }
402
403  @Override
404  public Table getTable(TableName tableName) throws IOException {
405    return getTable(tableName, getBatchPool());
406  }
407
408  @Override
409  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
410    return new TableBuilderBase(tableName, connectionConfig) {
411
412      @Override
413      public Table build() {
414        return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
415          rpcControllerFactory, pool);
416      }
417    };
418  }
419
420  @Override
421  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
422    if (params.getTableName() == null) {
423      throw new IllegalArgumentException("TableName cannot be null.");
424    }
425    if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
426      params.writeBufferSize(connectionConfig.getWriteBufferSize());
427    }
428    if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
429      params.setWriteBufferPeriodicFlushTimeoutMs(
430        connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
431    }
432    if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
433      params.setWriteBufferPeriodicFlushTimerTickMs(
434        connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
435    }
436    if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
437      params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
438    }
439    // Look to see if an alternate BufferedMutation implementation is wanted.
440    // Look in params and in config. If null, use default.
441    String implementationClassName = params.getImplementationClassName();
442    if (implementationClassName == null) {
443      implementationClassName = this.alternateBufferedMutatorClassName;
444    }
445    if (implementationClassName == null) {
446      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
447    }
448    try {
449      return (BufferedMutator) ReflectionUtils.newInstance(Class.forName(implementationClassName),
450        this, rpcCallerFactory, rpcControllerFactory, params);
451    } catch (ClassNotFoundException e) {
452      throw new RuntimeException(e);
453    }
454  }
455
456  @Override
457  public BufferedMutator getBufferedMutator(TableName tableName) {
458    return getBufferedMutator(new BufferedMutatorParams(tableName));
459  }
460
461  @Override
462  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
463    return new HRegionLocator(tableName, this);
464  }
465
466  @Override
467  public Admin getAdmin() throws IOException {
468    return new HBaseAdmin(this);
469  }
470
471  @Override
472  public Hbck getHbck() throws IOException {
473    return TraceUtil.trace(() -> getHbck(get(registry.getActiveMaster())),
474      () -> TraceUtil.createSpan(this.getClass().getSimpleName() + ".getHbck"));
475  }
476
477  @Override
478  public Hbck getHbck(ServerName masterServer) throws IOException {
479    return TraceUtil.trace(() -> {
480      checkClosed();
481      if (isDeadServer(masterServer)) {
482        throw new RegionServerStoppedException(masterServer + " is dead.");
483      }
484      String key =
485        getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), masterServer);
486
487      return new HBaseHbck(
488        (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
489          BlockingRpcChannel channel =
490            this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
491          return MasterProtos.HbckService.newBlockingStub(channel);
492        }), rpcControllerFactory);
493    }, () -> TraceUtil.createSpan(this.getClass().getSimpleName() + ".getHbck")
494      .setAttribute(HBaseSemanticAttributes.SERVER_NAME_KEY, masterServer.getServerName()));
495  }
496
497  @Override
498  public MetricsConnection getConnectionMetrics() {
499    return this.metrics;
500  }
501
502  @Override
503  public User getUser() {
504    return user;
505  }
506
507  @Override
508  public ConnectionRegistry getConnectionRegistry() {
509    return registry;
510  }
511
512  private ThreadPoolExecutor getBatchPool() {
513    if (batchPool == null) {
514      synchronized (this) {
515        if (batchPool == null) {
516          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
517          this.batchPool = getThreadPool(threads, threads, "-shared", null);
518          this.cleanupPool = true;
519        }
520      }
521    }
522    return this.batchPool;
523  }
524
525  private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint,
526    BlockingQueue<Runnable> passedWorkQueue) {
527    // shared HTable thread executor not yet initialized
528    if (maxThreads == 0) {
529      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
530    }
531    if (coreThreads == 0) {
532      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
533    }
534    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
535    BlockingQueue<Runnable> workQueue = passedWorkQueue;
536    if (workQueue == null) {
537      workQueue =
538        new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
539          HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
540      coreThreads = maxThreads;
541    }
542    ThreadPoolExecutor tpe =
543      new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
544        new ThreadFactoryBuilder().setNameFormat(toString() + nameHint + "-pool-%d")
545          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
546    tpe.allowCoreThreadTimeOut(true);
547    return tpe;
548  }
549
550  private ThreadPoolExecutor getMetaLookupPool() {
551    if (this.metaLookupPool == null) {
552      synchronized (this) {
553        if (this.metaLookupPool == null) {
554          // Some of the threads would be used for meta replicas
555          // To start with, threads.max.core threads can hit the meta (including replicas).
556          // After that, requests will get queued up in the passed queue, and only after
557          // the queue is full, a new thread will be started
558          int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
559          this.metaLookupPool =
560            getThreadPool(threads, threads, "-metaLookup-shared-", new LinkedBlockingQueue<>());
561        }
562      }
563    }
564    return this.metaLookupPool;
565  }
566
567  protected ExecutorService getCurrentMetaLookupPool() {
568    return metaLookupPool;
569  }
570
571  protected ExecutorService getCurrentBatchPool() {
572    return batchPool;
573  }
574
575  private void shutdownPools() {
576    if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
577      shutdownBatchPool(this.batchPool);
578    }
579    if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
580      shutdownBatchPool(this.metaLookupPool);
581    }
582  }
583
584  private void shutdownBatchPool(ExecutorService pool) {
585    pool.shutdown();
586    try {
587      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
588        pool.shutdownNow();
589      }
590    } catch (InterruptedException e) {
591      pool.shutdownNow();
592    }
593  }
594
595  /**
596   * For tests only.
597   */
598  RpcClient getRpcClient() {
599    return rpcClient;
600  }
601
602  /**
603   * An identifier that will remain the same for a given connection.
604   */
605  @Override
606  public String toString() {
607    return "hconnection-0x" + Integer.toHexString(hashCode());
608  }
609
610  protected String clusterId = null;
611
612  protected void retrieveClusterId() {
613    if (clusterId != null) {
614      return;
615    }
616    try {
617      this.clusterId = this.registry.getClusterId().get();
618    } catch (InterruptedException | ExecutionException e) {
619      LOG.warn("Retrieve cluster id failed", e);
620    }
621    if (clusterId == null) {
622      clusterId = HConstants.CLUSTER_ID_DEFAULT;
623      LOG.debug("clusterid came back null, using default " + clusterId);
624    }
625  }
626
627  /**
628   * If choreService has not been created yet, create the ChoreService. n
629   */
630  synchronized ChoreService getChoreService() {
631    if (choreService == null) {
632      choreService = new ChoreService("AsyncConn Chore Service");
633    }
634    return choreService;
635  }
636
637  @Override
638  public Configuration getConfiguration() {
639    return this.conf;
640  }
641
642  private void checkClosed() throws LocalConnectionClosedException {
643    if (this.closed) {
644      throw new LocalConnectionClosedException(toString() + " closed");
645    }
646  }
647
648  /**
649   * Like {@link ConnectionClosedException} but thrown from the checkClosed call which looks at the
650   * local this.closed flag. We use this rather than {@link ConnectionClosedException} because the
651   * latter does not inherit from DoNotRetryIOE (it should. TODO).
652   */
653  private static class LocalConnectionClosedException extends DoNotRetryIOException {
654    LocalConnectionClosedException(String message) {
655      super(message);
656    }
657  }
658
659  /**
660   * @return true if the master is running, throws an exception otherwise
661   * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
662   * @deprecated this has been deprecated without a replacement
663   */
664  @Deprecated
665  @Override
666  public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
667    // When getting the master connection, we check it's running,
668    // so if there is no exception, it means we've been able to get a
669    // connection on a running master
670    MasterKeepAliveConnection m;
671    try {
672      m = getKeepAliveMasterService();
673    } catch (IOException e) {
674      throw new MasterNotRunningException(e);
675    }
676    m.close();
677    return true;
678  }
679
680  @Override
681  public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
682    boolean reload) throws IOException {
683    return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
684  }
685
686  @Override
687  public boolean isTableEnabled(TableName tableName) throws IOException {
688    return getTableState(tableName).inStates(TableState.State.ENABLED);
689  }
690
691  @Override
692  public boolean isTableDisabled(TableName tableName) throws IOException {
693    return getTableState(tableName).inStates(TableState.State.DISABLED);
694  }
695
696  @Override
697  public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
698    throws IOException {
699    checkClosed();
700    try {
701      if (!isTableEnabled(tableName)) {
702        LOG.debug("Table {} not enabled", tableName);
703        return false;
704      }
705      if (TableName.isMetaTableName(tableName)) {
706        // meta table is always available
707        return true;
708      }
709      List<Pair<RegionInfo, ServerName>> locations =
710        MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true);
711
712      int notDeployed = 0;
713      int regionCount = 0;
714      for (Pair<RegionInfo, ServerName> pair : locations) {
715        RegionInfo info = pair.getFirst();
716        if (pair.getSecond() == null) {
717          LOG.debug("Table {} has not deployed region {}", tableName,
718            pair.getFirst().getEncodedName());
719          notDeployed++;
720        } else
721          if (splitKeys != null && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
722            for (byte[] splitKey : splitKeys) {
723              // Just check if the splitkey is available
724              if (Bytes.equals(info.getStartKey(), splitKey)) {
725                regionCount++;
726                break;
727              }
728            }
729          } else {
730            // Always empty start row should be counted
731            regionCount++;
732          }
733      }
734      if (notDeployed > 0) {
735        if (LOG.isDebugEnabled()) {
736          LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed);
737        }
738        return false;
739      } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
740        if (LOG.isDebugEnabled()) {
741          LOG.debug("Table {} expected to have {} regions, but only {} available", tableName,
742            splitKeys.length + 1, regionCount);
743        }
744        return false;
745      } else {
746        LOG.trace("Table {} should be available", tableName);
747        return true;
748      }
749    } catch (TableNotFoundException tnfe) {
750      LOG.warn("Table {} does not exist", tableName);
751      return false;
752    }
753  }
754
755  @Override
756  public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
757    RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
758      RegionInfo.getStartKey(regionName), false, true);
759    return locations == null ? null : locations.getRegionLocation();
760  }
761
762  private boolean isDeadServer(ServerName sn) {
763    if (clusterStatusListener == null) {
764      return false;
765    } else {
766      return clusterStatusListener.isDeadServer(sn);
767    }
768  }
769
770  @Override
771  public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
772    return locateRegions(tableName, false, true);
773  }
774
775  @Override
776  public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
777    boolean offlined) throws IOException {
778    List<RegionInfo> regions;
779    if (TableName.isMetaTableName(tableName)) {
780      regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO);
781    } else {
782      regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined);
783    }
784    List<HRegionLocation> locations = new ArrayList<>();
785    for (RegionInfo regionInfo : regions) {
786      if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) {
787        continue;
788      }
789      RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
790      if (list != null) {
791        for (HRegionLocation loc : list.getRegionLocations()) {
792          if (loc != null) {
793            locations.add(loc);
794          }
795        }
796      }
797    }
798    return locations;
799  }
800
801  @Override
802  public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
803    throws IOException {
804    RegionLocations locations = locateRegion(tableName, row, true, true);
805    return locations == null ? null : locations.getRegionLocation();
806  }
807
808  @Override
809  public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
810    throws IOException {
811    RegionLocations locations =
812      relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
813    return locations == null
814      ? null
815      : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
816  }
817
818  @Override
819  public RegionLocations relocateRegion(final TableName tableName, final byte[] row, int replicaId)
820    throws IOException {
821    // Since this is an explicit request not to use any caching, finding
822    // disabled tables should not be desirable. This will ensure that an exception is thrown when
823    // the first time a disabled table is interacted with.
824    if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
825      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
826    }
827
828    return locateRegion(tableName, row, false, true, replicaId);
829  }
830
831  @Override
832  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
833    boolean retry) throws IOException {
834    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
835  }
836
837  @Override
838  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
839    boolean retry, int replicaId) throws IOException {
840    checkClosed();
841    if (tableName == null || tableName.getName().length == 0) {
842      throw new IllegalArgumentException("table name cannot be null or zero length");
843    }
844    if (tableName.equals(TableName.META_TABLE_NAME)) {
845      return locateMeta(tableName, useCache, replicaId);
846    } else {
847      // Region not in the cache - have to go to the meta RS
848      return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
849    }
850  }
851
852  private RegionLocations locateMeta(final TableName tableName, boolean useCache, int replicaId)
853    throws IOException {
854    // HBASE-10785: We cache the location of the META itself, so that we are not overloading
855    // zookeeper with one request for every region lookup. We cache the META with empty row
856    // key in MetaCache.
857    byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
858    RegionLocations locations = null;
859    if (useCache) {
860      locations = getCachedLocation(tableName, metaCacheKey);
861      if (locations != null && locations.getRegionLocation(replicaId) != null) {
862        return locations;
863      }
864    }
865
866    // only one thread should do the lookup.
867    synchronized (metaRegionLock) {
868      // Check the cache again for a hit in case some other thread made the
869      // same query while we were waiting on the lock.
870      if (useCache) {
871        locations = getCachedLocation(tableName, metaCacheKey);
872        if (locations != null && locations.getRegionLocation(replicaId) != null) {
873          return locations;
874        }
875      }
876
877      // Look up from zookeeper
878      locations = get(this.registry.getMetaRegionLocations());
879      if (locations != null) {
880        cacheLocation(tableName, locations);
881      }
882    }
883    return locations;
884  }
885
886  /**
887   * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're
888   * seeking.
889   */
890  private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache,
891    boolean retry, int replicaId) throws IOException {
892    // If we are supposed to be using the cache, look in the cache to see if we already have the
893    // region.
894    if (useCache) {
895      RegionLocations locations = getCachedLocation(tableName, row);
896      if (locations != null && locations.getRegionLocation(replicaId) != null) {
897        return locations;
898      }
899    }
900    // build the key of the meta region we should be looking for.
901    // the extra 9's on the end are necessary to allow "exact" matches
902    // without knowing the precise region names.
903    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
904    byte[] metaStopKey =
905      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
906    Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
907      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(1)
908      .setReadType(ReadType.PREAD);
909
910    switch (this.metaReplicaMode) {
911      case LOAD_BALANCE:
912        int metaReplicaId =
913          this.metaReplicaSelector.select(tableName, row, RegionLocateType.CURRENT);
914        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
915          // If the selector gives a non-primary meta replica region, then go with it.
916          // Otherwise, just go to primary in non-hedgedRead mode.
917          s.setConsistency(Consistency.TIMELINE);
918          s.setReplicaId(metaReplicaId);
919        }
920        break;
921      case HEDGED_READ:
922        s.setConsistency(Consistency.TIMELINE);
923        break;
924      default:
925        // do nothing
926    }
927    int maxAttempts = (retry ? numTries : 1);
928    boolean relocateMeta = false;
929    for (int tries = 0;; tries++) {
930      if (tries >= maxAttempts) {
931        throw new NoServerForRegionException("Unable to find region for "
932          + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
933      }
934      if (useCache) {
935        RegionLocations locations = getCachedLocation(tableName, row);
936        if (locations != null && locations.getRegionLocation(replicaId) != null) {
937          return locations;
938        }
939      } else {
940        // If we are not supposed to be using the cache, delete any existing cached location
941        // so it won't interfere.
942        // We are only supposed to clean the cache for the specific replicaId
943        metaCache.clearCache(tableName, row, replicaId);
944      }
945      // Query the meta region
946      long pauseBase = connectionConfig.getPauseMillis();
947      takeUserRegionLock();
948      try {
949        // We don't need to check if useCache is enabled or not. Even if useCache is false
950        // we already cleared the cache for this row before acquiring userRegion lock so if this
951        // row is present in cache that means some other thread has populated it while we were
952        // waiting to acquire user region lock.
953        RegionLocations locations = getCachedLocation(tableName, row);
954        if (locations != null && locations.getRegionLocation(replicaId) != null) {
955          return locations;
956        }
957        if (relocateMeta) {
958          relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
959            RegionInfo.DEFAULT_REPLICA_ID);
960        }
961        s.resetMvccReadPoint();
962        final Span span = new TableOperationSpanBuilder(this)
963          .setTableName(TableName.META_TABLE_NAME).setOperation(s).build();
964        try (Scope ignored = span.makeCurrent();
965          ReversedClientScanner rcs =
966            new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
967              rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(),
968              connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond)) {
969          boolean tableNotFound = true;
970          for (;;) {
971            Result regionInfoRow = rcs.next();
972            if (regionInfoRow == null) {
973              if (tableNotFound) {
974                throw new TableNotFoundException(tableName);
975              } else {
976                throw new IOException(
977                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
978              }
979            }
980            tableNotFound = false;
981            // convert the row result into the HRegionLocation we need!
982            locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
983            if (locations == null || locations.getRegionLocation(replicaId) == null) {
984              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
985            }
986            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
987            if (regionInfo == null) {
988              throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME
989                + ", row=" + regionInfoRow);
990            }
991            // See HBASE-20182. It is possible that we locate to a split parent even after the
992            // children are online, so here we need to skip this region and go to the next one.
993            if (regionInfo.isSplitParent()) {
994              continue;
995            }
996            if (regionInfo.isOffline()) {
997              throw new RegionOfflineException(
998                "Region offline; disable table call? " + regionInfo.getRegionNameAsString());
999            }
1000            // It is possible that the split children have not been online yet and we have skipped
1001            // the parent in the above condition, so we may have already reached a region which does
1002            // not contains us.
1003            if (!regionInfo.containsRow(row)) {
1004              throw new IOException(
1005                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
1006            }
1007            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
1008            if (serverName == null) {
1009              throw new NoServerForRegionException("No server address listed in "
1010                + TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString()
1011                + " containing row " + Bytes.toStringBinary(row));
1012            }
1013            if (isDeadServer(serverName)) {
1014              throw new RegionServerStoppedException(
1015                "hbase:meta says the region " + regionInfo.getRegionNameAsString()
1016                  + " is managed by the server " + serverName + ", but it is dead.");
1017            }
1018            // Instantiate the location
1019            cacheLocation(tableName, locations);
1020            return locations;
1021          }
1022        }
1023      } catch (TableNotFoundException e) {
1024        // if we got this error, probably means the table just plain doesn't
1025        // exist. rethrow the error immediately. this should always be coming
1026        // from the HTable constructor.
1027        throw e;
1028      } catch (LocalConnectionClosedException cce) {
1029        // LocalConnectionClosedException is specialized instance of DoNotRetryIOE.
1030        // Thrown when we check if this connection is closed. If it is, don't retry.
1031        throw cce;
1032      } catch (IOException e) {
1033        ExceptionUtil.rethrowIfInterrupt(e);
1034        if (e instanceof RemoteException) {
1035          e = ((RemoteException) e).unwrapRemoteException();
1036        }
1037        if (HBaseServerException.isServerOverloaded(e)) {
1038          // Give a special pause when encountering an exception indicating the server
1039          // is overloaded. see #HBASE-17114 and HBASE-26807
1040          pauseBase = connectionConfig.getPauseMillisForServerOverloaded();
1041        }
1042        if (tries < maxAttempts - 1) {
1043          LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying "
1044            + "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
1045        } else {
1046          throw e;
1047        }
1048        // Only relocate the parent region if necessary
1049        relocateMeta =
1050          !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
1051      } finally {
1052        userRegionLock.unlock();
1053      }
1054      try {
1055        Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
1056      } catch (InterruptedException e) {
1057        throw new InterruptedIOException(
1058          "Giving up trying to location region in " + "meta: thread is interrupted.");
1059      }
1060    }
1061  }
1062
1063  void takeUserRegionLock() throws IOException {
1064    try {
1065      long waitTime = connectionConfig.getMetaOperationTimeout();
1066      if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
1067        throw new LockTimeoutException("Failed to get user region lock in" + waitTime + " ms. "
1068          + " for accessing meta region server.");
1069      }
1070    } catch (InterruptedException ie) {
1071      LOG.error("Interrupted while waiting for a lock", ie);
1072      throw ExceptionUtil.asInterrupt(ie);
1073    }
1074  }
1075
1076  /**
1077   * Put a newly discovered HRegionLocation into the cache.
1078   * @param tableName The table name.
1079   * @param location  the new location
1080   */
1081  @Override
1082  public void cacheLocation(final TableName tableName, final RegionLocations location) {
1083    metaCache.cacheLocation(tableName, location);
1084  }
1085
1086  /**
1087   * Search the cache for a location that fits our table and row key. Return null if no suitable
1088   * region is located.
1089   * @return Null or region location found in cache.
1090   */
1091  RegionLocations getCachedLocation(final TableName tableName, final byte[] row) {
1092    return metaCache.getCachedLocation(tableName, row);
1093  }
1094
1095  public void clearRegionCache(final TableName tableName, byte[] row) {
1096    metaCache.clearCache(tableName, row);
1097  }
1098
1099  /*
1100   * Delete all cached entries of a table that maps to a specific location.
1101   */
1102  @Override
1103  public void clearCaches(final ServerName serverName) {
1104    metaCache.clearCache(serverName);
1105  }
1106
1107  @Override
1108  public void clearRegionLocationCache() {
1109    metaCache.clearCache();
1110  }
1111
1112  @Override
1113  public void clearRegionCache(final TableName tableName) {
1114    metaCache.clearCache(tableName);
1115  }
1116
1117  /**
1118   * Put a newly discovered HRegionLocation into the cache.
1119   * @param tableName The table name.
1120   * @param source    the source of the new location, if it's not coming from meta
1121   * @param location  the new location
1122   */
1123  private void cacheLocation(final TableName tableName, final ServerName source,
1124    final HRegionLocation location) {
1125    metaCache.cacheLocation(tableName, source, location);
1126  }
1127
1128  // Map keyed by service name + regionserver to service stub implementation
1129  private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>();
1130
1131  /**
1132   * State of the MasterService connection/setup.
1133   */
1134  static class MasterServiceState {
1135    Connection connection;
1136
1137    MasterProtos.MasterService.BlockingInterface stub;
1138    int userCount;
1139
1140    MasterServiceState(final Connection connection) {
1141      super();
1142      this.connection = connection;
1143    }
1144
1145    @Override
1146    public String toString() {
1147      return "MasterService";
1148    }
1149
1150    Object getStub() {
1151      return this.stub;
1152    }
1153
1154    void clearStub() {
1155      this.stub = null;
1156    }
1157
1158    boolean isMasterRunning() throws IOException {
1159      MasterProtos.IsMasterRunningResponse response = null;
1160      try {
1161        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1162      } catch (Exception e) {
1163        throw ProtobufUtil.handleRemoteException(e);
1164      }
1165      return response != null ? response.getIsMasterRunning() : false;
1166    }
1167  }
1168
1169  /**
1170   * The record of errors for servers.
1171   */
1172  static class ServerErrorTracker {
1173    // We need a concurrent map here, as we could have multiple threads updating it in parallel.
1174    private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
1175      new ConcurrentHashMap<>();
1176    private final long canRetryUntil;
1177    private final int maxTries;// max number to try
1178    private final long startTrackingTime;
1179
1180    /**
1181     * Constructor
1182     * @param timeout  how long to wait before timeout, in unit of millisecond
1183     * @param maxTries how many times to try
1184     */
1185    @SuppressWarnings("JavaUtilDate")
1186    public ServerErrorTracker(long timeout, int maxTries) {
1187      this.maxTries = maxTries;
1188      this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
1189      this.startTrackingTime = new Date().getTime();
1190    }
1191
1192    /**
1193     * We stop to retry when we have exhausted BOTH the number of tries and the time allocated.
1194     * @param numAttempt how many times we have tried by now
1195     */
1196    boolean canTryMore(int numAttempt) {
1197      // If there is a single try we must not take into account the time.
1198      return numAttempt < maxTries
1199        || (maxTries > 1 && EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
1200    }
1201
1202    /**
1203     * Calculates the back-off time for a retrying request to a particular server.
1204     * @param server    The server in question.
1205     * @param basePause The default hci pause.
1206     * @return The time to wait before sending next request.
1207     */
1208    long calculateBackoffTime(ServerName server, long basePause) {
1209      long result;
1210      ServerErrors errorStats = errorsByServer.get(server);
1211      if (errorStats != null) {
1212        result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1));
1213      } else {
1214        result = 0; // yes, if the server is not in our list we don't wait before retrying.
1215      }
1216      return result;
1217    }
1218
1219    /**
1220     * Reports that there was an error on the server to do whatever bean-counting necessary.
1221     * @param server The server in question.
1222     */
1223    void reportServerError(ServerName server) {
1224      computeIfAbsent(errorsByServer, server, ServerErrors::new).addError();
1225    }
1226
1227    long getStartTrackingTime() {
1228      return startTrackingTime;
1229    }
1230
1231    /**
1232     * The record of errors for a server.
1233     */
1234    private static class ServerErrors {
1235      private final AtomicInteger retries = new AtomicInteger(0);
1236
1237      public int getCount() {
1238        return retries.get();
1239      }
1240
1241      public void addError() {
1242        retries.incrementAndGet();
1243      }
1244    }
1245  }
1246
1247  /**
1248   * Class to make a MasterServiceStubMaker stub.
1249   */
1250  private final class MasterServiceStubMaker {
1251
1252    private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
1253      throws IOException {
1254      try {
1255        stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1256      } catch (ServiceException e) {
1257        throw ProtobufUtil.handleRemoteException(e);
1258      }
1259    }
1260
1261    /**
1262     * Create a stub. Try once only. It is not typed because there is no common type to protobuf
1263     * services nor their interfaces. Let the caller do appropriate casting.
1264     * @return A stub for master services.
1265     */
1266    private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
1267      throws IOException, KeeperException {
1268      ServerName sn = get(registry.getActiveMaster());
1269      if (sn == null) {
1270        String msg = "ZooKeeper available but no active master location found";
1271        LOG.info(msg);
1272        throw new MasterNotRunningException(msg);
1273      }
1274      if (isDeadServer(sn)) {
1275        throw new MasterNotRunningException(sn + " is dead.");
1276      }
1277      // Use the security info interface name as our stub key
1278      String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn);
1279      MasterProtos.MasterService.BlockingInterface stub =
1280        (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1281          BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1282          return MasterProtos.MasterService.newBlockingStub(channel);
1283        });
1284      isMasterRunning(stub);
1285      return stub;
1286    }
1287
1288    /**
1289     * Create a stub against the master. Retry if necessary.
1290     * @return A stub to do <code>intf</code> against the master
1291     * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
1292     */
1293    MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
1294      // The lock must be at the beginning to prevent multiple master creations
1295      // (and leaks) in a multithread context
1296      synchronized (masterLock) {
1297        Exception exceptionCaught = null;
1298        if (!closed) {
1299          try {
1300            return makeStubNoRetries();
1301          } catch (IOException e) {
1302            exceptionCaught = e;
1303          } catch (KeeperException e) {
1304            exceptionCaught = e;
1305          }
1306          throw new MasterNotRunningException(exceptionCaught);
1307        } else {
1308          throw new DoNotRetryIOException("Connection was closed while trying to get master");
1309        }
1310      }
1311    }
1312  }
1313
1314  @Override
1315  public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
1316    return getAdmin(get(registry.getActiveMaster()));
1317  }
1318
1319  @Override
1320  public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
1321    throws IOException {
1322    checkClosed();
1323    if (isDeadServer(serverName)) {
1324      throw new RegionServerStoppedException(serverName + " is dead.");
1325    }
1326    String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName);
1327    return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1328      BlockingRpcChannel channel =
1329        this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1330      return AdminProtos.AdminService.newBlockingStub(channel);
1331    });
1332  }
1333
1334  @Override
1335  public BlockingInterface getClient(ServerName serverName) throws IOException {
1336    checkClosed();
1337    if (isDeadServer(serverName)) {
1338      throw new RegionServerStoppedException(serverName + " is dead.");
1339    }
1340    String key =
1341      getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), serverName);
1342    return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1343      BlockingRpcChannel channel =
1344        this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1345      return ClientProtos.ClientService.newBlockingStub(channel);
1346    });
1347  }
1348
1349  final MasterServiceState masterServiceState = new MasterServiceState(this);
1350
1351  @Override
1352  public MasterKeepAliveConnection getMaster() throws IOException {
1353    return getKeepAliveMasterService();
1354  }
1355
1356  private void resetMasterServiceState(final MasterServiceState mss) {
1357    mss.userCount++;
1358  }
1359
1360  private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
1361    synchronized (masterLock) {
1362      if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1363        MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1364        this.masterServiceState.stub = stubMaker.makeStub();
1365      }
1366      resetMasterServiceState(this.masterServiceState);
1367    }
1368    // Ugly delegation just so we can add in a Close method.
1369    final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
1370    return new MasterKeepAliveConnection() {
1371      MasterServiceState mss = masterServiceState;
1372
1373      @Override
1374      public MasterProtos.AbortProcedureResponse abortProcedure(RpcController controller,
1375        MasterProtos.AbortProcedureRequest request) throws ServiceException {
1376        return stub.abortProcedure(controller, request);
1377      }
1378
1379      @Override
1380      public MasterProtos.GetProceduresResponse getProcedures(RpcController controller,
1381        MasterProtos.GetProceduresRequest request) throws ServiceException {
1382        return stub.getProcedures(controller, request);
1383      }
1384
1385      @Override
1386      public MasterProtos.GetLocksResponse getLocks(RpcController controller,
1387        MasterProtos.GetLocksRequest request) throws ServiceException {
1388        return stub.getLocks(controller, request);
1389      }
1390
1391      @Override
1392      public MasterProtos.AddColumnResponse addColumn(RpcController controller,
1393        MasterProtos.AddColumnRequest request) throws ServiceException {
1394        return stub.addColumn(controller, request);
1395      }
1396
1397      @Override
1398      public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller,
1399        MasterProtos.DeleteColumnRequest request) throws ServiceException {
1400        return stub.deleteColumn(controller, request);
1401      }
1402
1403      @Override
1404      public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller,
1405        MasterProtos.ModifyColumnRequest request) throws ServiceException {
1406        return stub.modifyColumn(controller, request);
1407      }
1408
1409      @Override
1410      public MasterProtos.MoveRegionResponse moveRegion(RpcController controller,
1411        MasterProtos.MoveRegionRequest request) throws ServiceException {
1412        return stub.moveRegion(controller, request);
1413      }
1414
1415      @Override
1416      public MasterProtos.MergeTableRegionsResponse mergeTableRegions(RpcController controller,
1417        MasterProtos.MergeTableRegionsRequest request) throws ServiceException {
1418        return stub.mergeTableRegions(controller, request);
1419      }
1420
1421      @Override
1422      public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
1423        MasterProtos.AssignRegionRequest request) throws ServiceException {
1424        return stub.assignRegion(controller, request);
1425      }
1426
1427      @Override
1428      public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller,
1429        MasterProtos.UnassignRegionRequest request) throws ServiceException {
1430        return stub.unassignRegion(controller, request);
1431      }
1432
1433      @Override
1434      public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller,
1435        MasterProtos.OfflineRegionRequest request) throws ServiceException {
1436        return stub.offlineRegion(controller, request);
1437      }
1438
1439      @Override
1440      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
1441        MasterProtos.SplitTableRegionRequest request) throws ServiceException {
1442        return stub.splitRegion(controller, request);
1443      }
1444
1445      @Override
1446      public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
1447        MasterProtos.DeleteTableRequest request) throws ServiceException {
1448        return stub.deleteTable(controller, request);
1449      }
1450
1451      @Override
1452      public MasterProtos.TruncateTableResponse truncateTable(RpcController controller,
1453        MasterProtos.TruncateTableRequest request) throws ServiceException {
1454        return stub.truncateTable(controller, request);
1455      }
1456
1457      @Override
1458      public MasterProtos.EnableTableResponse enableTable(RpcController controller,
1459        MasterProtos.EnableTableRequest request) throws ServiceException {
1460        return stub.enableTable(controller, request);
1461      }
1462
1463      @Override
1464      public MasterProtos.DisableTableResponse disableTable(RpcController controller,
1465        MasterProtos.DisableTableRequest request) throws ServiceException {
1466        return stub.disableTable(controller, request);
1467      }
1468
1469      @Override
1470      public MasterProtos.ModifyTableResponse modifyTable(RpcController controller,
1471        MasterProtos.ModifyTableRequest request) throws ServiceException {
1472        return stub.modifyTable(controller, request);
1473      }
1474
1475      @Override
1476      public MasterProtos.CreateTableResponse createTable(RpcController controller,
1477        MasterProtos.CreateTableRequest request) throws ServiceException {
1478        return stub.createTable(controller, request);
1479      }
1480
1481      @Override
1482      public MasterProtos.ShutdownResponse shutdown(RpcController controller,
1483        MasterProtos.ShutdownRequest request) throws ServiceException {
1484        return stub.shutdown(controller, request);
1485      }
1486
1487      @Override
1488      public MasterProtos.StopMasterResponse stopMaster(RpcController controller,
1489        MasterProtos.StopMasterRequest request) throws ServiceException {
1490        return stub.stopMaster(controller, request);
1491      }
1492
1493      @Override
1494      public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1495        final RpcController controller, final MasterProtos.IsInMaintenanceModeRequest request)
1496        throws ServiceException {
1497        return stub.isMasterInMaintenanceMode(controller, request);
1498      }
1499
1500      @Override
1501      public MasterProtos.BalanceResponse balance(RpcController controller,
1502        MasterProtos.BalanceRequest request) throws ServiceException {
1503        return stub.balance(controller, request);
1504      }
1505
1506      @Override
1507      public MasterProtos.SetBalancerRunningResponse setBalancerRunning(RpcController controller,
1508        MasterProtos.SetBalancerRunningRequest request) throws ServiceException {
1509        return stub.setBalancerRunning(controller, request);
1510      }
1511
1512      @Override
1513      public NormalizeResponse normalize(RpcController controller, NormalizeRequest request)
1514        throws ServiceException {
1515        return stub.normalize(controller, request);
1516      }
1517
1518      @Override
1519      public SetNormalizerRunningResponse setNormalizerRunning(RpcController controller,
1520        SetNormalizerRunningRequest request) throws ServiceException {
1521        return stub.setNormalizerRunning(controller, request);
1522      }
1523
1524      @Override
1525      public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller,
1526        MasterProtos.RunCatalogScanRequest request) throws ServiceException {
1527        return stub.runCatalogScan(controller, request);
1528      }
1529
1530      @Override
1531      public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor(
1532        RpcController controller, MasterProtos.EnableCatalogJanitorRequest request)
1533        throws ServiceException {
1534        return stub.enableCatalogJanitor(controller, request);
1535      }
1536
1537      @Override
1538      public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1539        RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request)
1540        throws ServiceException {
1541        return stub.isCatalogJanitorEnabled(controller, request);
1542      }
1543
1544      @Override
1545      public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller,
1546        MasterProtos.RunCleanerChoreRequest request) throws ServiceException {
1547        return stub.runCleanerChore(controller, request);
1548      }
1549
1550      @Override
1551      public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning(
1552        RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request)
1553        throws ServiceException {
1554        return stub.setCleanerChoreRunning(controller, request);
1555      }
1556
1557      @Override
1558      public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled(
1559        RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request)
1560        throws ServiceException {
1561        return stub.isCleanerChoreEnabled(controller, request);
1562      }
1563
1564      @Override
1565      public ClientProtos.CoprocessorServiceResponse execMasterService(RpcController controller,
1566        ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
1567        return stub.execMasterService(controller, request);
1568      }
1569
1570      @Override
1571      public MasterProtos.SnapshotResponse snapshot(RpcController controller,
1572        MasterProtos.SnapshotRequest request) throws ServiceException {
1573        return stub.snapshot(controller, request);
1574      }
1575
1576      @Override
1577      public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots(
1578        RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request)
1579        throws ServiceException {
1580        return stub.getCompletedSnapshots(controller, request);
1581      }
1582
1583      @Override
1584      public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1585        MasterProtos.DeleteSnapshotRequest request) throws ServiceException {
1586        return stub.deleteSnapshot(controller, request);
1587      }
1588
1589      @Override
1590      public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1591        MasterProtos.IsSnapshotDoneRequest request) throws ServiceException {
1592        return stub.isSnapshotDone(controller, request);
1593      }
1594
1595      @Override
1596      public MasterProtos.RestoreSnapshotResponse restoreSnapshot(RpcController controller,
1597        MasterProtos.RestoreSnapshotRequest request) throws ServiceException {
1598        return stub.restoreSnapshot(controller, request);
1599      }
1600
1601      @Override
1602      public MasterProtos.SetSnapshotCleanupResponse switchSnapshotCleanup(RpcController controller,
1603        MasterProtos.SetSnapshotCleanupRequest request) throws ServiceException {
1604        return stub.switchSnapshotCleanup(controller, request);
1605      }
1606
1607      @Override
1608      public MasterProtos.IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled(
1609        RpcController controller, MasterProtos.IsSnapshotCleanupEnabledRequest request)
1610        throws ServiceException {
1611        return stub.isSnapshotCleanupEnabled(controller, request);
1612      }
1613
1614      @Override
1615      public MasterProtos.ExecProcedureResponse execProcedure(RpcController controller,
1616        MasterProtos.ExecProcedureRequest request) throws ServiceException {
1617        return stub.execProcedure(controller, request);
1618      }
1619
1620      @Override
1621      public MasterProtos.ExecProcedureResponse execProcedureWithRet(RpcController controller,
1622        MasterProtos.ExecProcedureRequest request) throws ServiceException {
1623        return stub.execProcedureWithRet(controller, request);
1624      }
1625
1626      @Override
1627      public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller,
1628        MasterProtos.IsProcedureDoneRequest request) throws ServiceException {
1629        return stub.isProcedureDone(controller, request);
1630      }
1631
1632      @Override
1633      public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
1634        MasterProtos.GetProcedureResultRequest request) throws ServiceException {
1635        return stub.getProcedureResult(controller, request);
1636      }
1637
1638      @Override
1639      public MasterProtos.IsMasterRunningResponse isMasterRunning(RpcController controller,
1640        MasterProtos.IsMasterRunningRequest request) throws ServiceException {
1641        return stub.isMasterRunning(controller, request);
1642      }
1643
1644      @Override
1645      public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller,
1646        MasterProtos.ModifyNamespaceRequest request) throws ServiceException {
1647        return stub.modifyNamespace(controller, request);
1648      }
1649
1650      @Override
1651      public MasterProtos.CreateNamespaceResponse createNamespace(RpcController controller,
1652        MasterProtos.CreateNamespaceRequest request) throws ServiceException {
1653        return stub.createNamespace(controller, request);
1654      }
1655
1656      @Override
1657      public MasterProtos.DeleteNamespaceResponse deleteNamespace(RpcController controller,
1658        MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
1659        return stub.deleteNamespace(controller, request);
1660      }
1661
1662      @Override
1663      public MasterProtos.ListNamespacesResponse listNamespaces(RpcController controller,
1664        MasterProtos.ListNamespacesRequest request) throws ServiceException {
1665        return stub.listNamespaces(controller, request);
1666      }
1667
1668      @Override
1669      public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(
1670        RpcController controller, MasterProtos.GetNamespaceDescriptorRequest request)
1671        throws ServiceException {
1672        return stub.getNamespaceDescriptor(controller, request);
1673      }
1674
1675      @Override
1676      public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(
1677        RpcController controller, MasterProtos.ListNamespaceDescriptorsRequest request)
1678        throws ServiceException {
1679        return stub.listNamespaceDescriptors(controller, request);
1680      }
1681
1682      @Override
1683      public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1684        RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request)
1685        throws ServiceException {
1686        return stub.listTableDescriptorsByNamespace(controller, request);
1687      }
1688
1689      @Override
1690      public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1691        RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request)
1692        throws ServiceException {
1693        return stub.listTableNamesByNamespace(controller, request);
1694      }
1695
1696      @Override
1697      public MasterProtos.GetTableStateResponse getTableState(RpcController controller,
1698        MasterProtos.GetTableStateRequest request) throws ServiceException {
1699        return stub.getTableState(controller, request);
1700      }
1701
1702      @Override
1703      public void close() {
1704        release(this.mss);
1705      }
1706
1707      @Override
1708      public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus(
1709        RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request)
1710        throws ServiceException {
1711        return stub.getSchemaAlterStatus(controller, request);
1712      }
1713
1714      @Override
1715      public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(RpcController controller,
1716        MasterProtos.GetTableDescriptorsRequest request) throws ServiceException {
1717        return stub.getTableDescriptors(controller, request);
1718      }
1719
1720      @Override
1721      public MasterProtos.GetTableNamesResponse getTableNames(RpcController controller,
1722        MasterProtos.GetTableNamesRequest request) throws ServiceException {
1723        return stub.getTableNames(controller, request);
1724      }
1725
1726      @Override
1727      public MasterProtos.GetClusterStatusResponse getClusterStatus(RpcController controller,
1728        MasterProtos.GetClusterStatusRequest request) throws ServiceException {
1729        return stub.getClusterStatus(controller, request);
1730      }
1731
1732      @Override
1733      public MasterProtos.SetQuotaResponse setQuota(RpcController controller,
1734        MasterProtos.SetQuotaRequest request) throws ServiceException {
1735        return stub.setQuota(controller, request);
1736      }
1737
1738      @Override
1739      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
1740        RpcController controller, MasterProtos.MajorCompactionTimestampRequest request)
1741        throws ServiceException {
1742        return stub.getLastMajorCompactionTimestamp(controller, request);
1743      }
1744
1745      @Override
1746      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
1747        RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request)
1748        throws ServiceException {
1749        return stub.getLastMajorCompactionTimestampForRegion(controller, request);
1750      }
1751
1752      @Override
1753      public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
1754        IsBalancerEnabledRequest request) throws ServiceException {
1755        return stub.isBalancerEnabled(controller, request);
1756      }
1757
1758      @Override
1759      public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
1760        RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request)
1761        throws ServiceException {
1762        return stub.setSplitOrMergeEnabled(controller, request);
1763      }
1764
1765      @Override
1766      public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
1767        RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request)
1768        throws ServiceException {
1769        return stub.isSplitOrMergeEnabled(controller, request);
1770      }
1771
1772      @Override
1773      public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
1774        IsNormalizerEnabledRequest request) throws ServiceException {
1775        return stub.isNormalizerEnabled(controller, request);
1776      }
1777
1778      @Override
1779      public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
1780        SecurityCapabilitiesRequest request) throws ServiceException {
1781        return stub.getSecurityCapabilities(controller, request);
1782      }
1783
1784      @Override
1785      public AddReplicationPeerResponse addReplicationPeer(RpcController controller,
1786        AddReplicationPeerRequest request) throws ServiceException {
1787        return stub.addReplicationPeer(controller, request);
1788      }
1789
1790      @Override
1791      public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
1792        RemoveReplicationPeerRequest request) throws ServiceException {
1793        return stub.removeReplicationPeer(controller, request);
1794      }
1795
1796      @Override
1797      public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
1798        EnableReplicationPeerRequest request) throws ServiceException {
1799        return stub.enableReplicationPeer(controller, request);
1800      }
1801
1802      @Override
1803      public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
1804        DisableReplicationPeerRequest request) throws ServiceException {
1805        return stub.disableReplicationPeer(controller, request);
1806      }
1807
1808      @Override
1809      public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
1810        RpcController controller, ListDecommissionedRegionServersRequest request)
1811        throws ServiceException {
1812        return stub.listDecommissionedRegionServers(controller, request);
1813      }
1814
1815      @Override
1816      public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
1817        DecommissionRegionServersRequest request) throws ServiceException {
1818        return stub.decommissionRegionServers(controller, request);
1819      }
1820
1821      @Override
1822      public RecommissionRegionServerResponse recommissionRegionServer(RpcController controller,
1823        RecommissionRegionServerRequest request) throws ServiceException {
1824        return stub.recommissionRegionServer(controller, request);
1825      }
1826
1827      @Override
1828      public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
1829        GetReplicationPeerConfigRequest request) throws ServiceException {
1830        return stub.getReplicationPeerConfig(controller, request);
1831      }
1832
1833      @Override
1834      public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig(
1835        RpcController controller, UpdateReplicationPeerConfigRequest request)
1836        throws ServiceException {
1837        return stub.updateReplicationPeerConfig(controller, request);
1838      }
1839
1840      @Override
1841      public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
1842        ListReplicationPeersRequest request) throws ServiceException {
1843        return stub.listReplicationPeers(controller, request);
1844      }
1845
1846      @Override
1847      public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller,
1848        GetSpaceQuotaRegionSizesRequest request) throws ServiceException {
1849        return stub.getSpaceQuotaRegionSizes(controller, request);
1850      }
1851
1852      @Override
1853      public GetQuotaStatesResponse getQuotaStates(RpcController controller,
1854        GetQuotaStatesRequest request) throws ServiceException {
1855        return stub.getQuotaStates(controller, request);
1856      }
1857
1858      @Override
1859      public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller,
1860        MasterProtos.ClearDeadServersRequest request) throws ServiceException {
1861        return stub.clearDeadServers(controller, request);
1862      }
1863
1864      @Override
1865      public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
1866        SwitchRpcThrottleRequest request) throws ServiceException {
1867        return stub.switchRpcThrottle(controller, request);
1868      }
1869
1870      @Override
1871      public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
1872        IsRpcThrottleEnabledRequest request) throws ServiceException {
1873        return stub.isRpcThrottleEnabled(controller, request);
1874      }
1875
1876      @Override
1877      public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller,
1878        SwitchExceedThrottleQuotaRequest request) throws ServiceException {
1879        return stub.switchExceedThrottleQuota(controller, request);
1880      }
1881
1882      @Override
1883      public AccessControlProtos.GrantResponse grant(RpcController controller,
1884        AccessControlProtos.GrantRequest request) throws ServiceException {
1885        return stub.grant(controller, request);
1886      }
1887
1888      @Override
1889      public AccessControlProtos.RevokeResponse revoke(RpcController controller,
1890        AccessControlProtos.RevokeRequest request) throws ServiceException {
1891        return stub.revoke(controller, request);
1892      }
1893
1894      @Override
1895      public GetUserPermissionsResponse getUserPermissions(RpcController controller,
1896        GetUserPermissionsRequest request) throws ServiceException {
1897        return stub.getUserPermissions(controller, request);
1898      }
1899
1900      @Override
1901      public HasUserPermissionsResponse hasUserPermissions(RpcController controller,
1902        HasUserPermissionsRequest request) throws ServiceException {
1903        return stub.hasUserPermissions(controller, request);
1904      }
1905
1906      @Override
1907      public HBaseProtos.LogEntry getLogEntries(RpcController controller,
1908        HBaseProtos.LogRequest request) throws ServiceException {
1909        return stub.getLogEntries(controller, request);
1910      }
1911
1912      @Override
1913      public ModifyTableStoreFileTrackerResponse modifyTableStoreFileTracker(
1914        RpcController controller, ModifyTableStoreFileTrackerRequest request)
1915        throws ServiceException {
1916        return stub.modifyTableStoreFileTracker(controller, request);
1917      }
1918
1919      @Override
1920      public ModifyColumnStoreFileTrackerResponse modifyColumnStoreFileTracker(
1921        RpcController controller, ModifyColumnStoreFileTrackerRequest request)
1922        throws ServiceException {
1923        return stub.modifyColumnStoreFileTracker(controller, request);
1924      }
1925
1926      @Override
1927      public FlushMasterStoreResponse flushMasterStore(RpcController controller,
1928        FlushMasterStoreRequest request) throws ServiceException {
1929        return stub.flushMasterStore(controller, request);
1930      }
1931    };
1932  }
1933
1934  private static void release(MasterServiceState mss) {
1935    if (mss != null && mss.connection != null) {
1936      ((ConnectionImplementation) mss.connection).releaseMaster(mss);
1937    }
1938  }
1939
1940  private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
1941    if (mss.getStub() == null) {
1942      return false;
1943    }
1944    try {
1945      return mss.isMasterRunning();
1946    } catch (UndeclaredThrowableException e) {
1947      // It's somehow messy, but we can receive exceptions such as
1948      // java.net.ConnectException but they're not declared. So we catch it...
1949      LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
1950      return false;
1951    } catch (IOException se) {
1952      LOG.warn("Checking master connection", se);
1953      return false;
1954    }
1955  }
1956
1957  void releaseMaster(MasterServiceState mss) {
1958    if (mss.getStub() == null) {
1959      return;
1960    }
1961    synchronized (masterLock) {
1962      --mss.userCount;
1963    }
1964  }
1965
1966  private void closeMasterService(MasterServiceState mss) {
1967    if (mss.getStub() != null) {
1968      LOG.info("Closing master protocol: " + mss);
1969      mss.clearStub();
1970    }
1971    mss.userCount = 0;
1972  }
1973
1974  /**
1975   * Immediate close of the shared master. Can be by the delayed close or when closing the
1976   * connection itself.
1977   */
1978  private void closeMaster() {
1979    synchronized (masterLock) {
1980      closeMasterService(masterServiceState);
1981    }
1982  }
1983
1984  void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) {
1985    HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
1986    cacheLocation(hri.getTable(), source, newHrl);
1987  }
1988
1989  @Override
1990  public void deleteCachedRegionLocation(final HRegionLocation location) {
1991    metaCache.clearCache(location);
1992  }
1993
1994  /**
1995   * Update the location with the new value (if the exception is a RegionMovedException) or delete
1996   * it from the cache. Does nothing if we can be sure from the exception that the location is still
1997   * accurate, or if the cache has already been updated.
1998   * @param exception an object (to simplify user code) on which we will try to find a nested or
1999   *                  wrapped or both RegionMovedException
2000   * @param source    server that is the source of the location update.
2001   */
2002  @Override
2003  public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
2004    final Object exception, final ServerName source) {
2005    if (rowkey == null || tableName == null) {
2006      LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey)
2007        + ", tableName=" + (tableName == null ? "null" : tableName));
2008      return;
2009    }
2010
2011    if (source == null) {
2012      // This should not happen, but let's secure ourselves.
2013      return;
2014    }
2015
2016    if (regionName == null) {
2017      // we do not know which region, so just remove the cache entry for the row and server
2018      if (metrics != null) {
2019        metrics.incrCacheDroppingExceptions(exception);
2020      }
2021      metaCache.clearCache(tableName, rowkey, source);
2022      return;
2023    }
2024
2025    // Is it something we have already updated?
2026    final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2027    HRegionLocation oldLocation = null;
2028    if (oldLocations != null) {
2029      oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2030    }
2031    if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2032      // There is no such location in the cache (it's been removed already) or
2033      // the cache has already been refreshed with a different location. => nothing to do
2034      return;
2035    }
2036
2037    RegionInfo regionInfo = oldLocation.getRegion();
2038    Throwable cause = ClientExceptionsUtil.findException(exception);
2039    if (cause != null) {
2040      if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2041        // We know that the region is still on this region server
2042        return;
2043      }
2044
2045      if (cause instanceof RegionMovedException) {
2046        RegionMovedException rme = (RegionMovedException) cause;
2047        if (LOG.isTraceEnabled()) {
2048          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to "
2049            + rme.getHostname() + ":" + rme.getPort() + " according to " + source.getAddress());
2050        }
2051        // We know that the region is not anymore on this region server, but we know
2052        // the new location.
2053        updateCachedLocation(regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2054        return;
2055      }
2056    }
2057
2058    if (metrics != null) {
2059      metrics.incrCacheDroppingExceptions(exception);
2060    }
2061
2062    // Tell metaReplicaSelector that the location is stale. It will create a stale entry
2063    // with timestamp internally. Next time the client looks up the same location,
2064    // it will pick a different meta replica region.
2065    if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
2066      metaReplicaSelector.onError(oldLocation);
2067    }
2068
2069    // If we're here, it means that can cannot be sure about the location, so we remove it from
2070    // the cache. Do not send the source because source can be a new server in the same host:port
2071    metaCache.clearCache(regionInfo);
2072  }
2073
2074  @Override
2075  public AsyncProcess getAsyncProcess() {
2076    return asyncProcess;
2077  }
2078
2079  @Override
2080  public ServerStatisticTracker getStatisticsTracker() {
2081    return this.stats;
2082  }
2083
2084  @Override
2085  public ClientBackoffPolicy getBackoffPolicy() {
2086    return this.backoffPolicy;
2087  }
2088
2089  /*
2090   * Return the number of cached region for a table. It will only be called from a unit test.
2091   */
2092  int getNumberOfCachedRegionLocations(final TableName tableName) {
2093    return metaCache.getNumberOfCachedRegionLocations(tableName);
2094  }
2095
2096  @Override
2097  public void abort(final String msg, Throwable t) {
2098    if (t != null) {
2099      LOG.error(HBaseMarkers.FATAL, msg, t);
2100    } else {
2101      LOG.error(HBaseMarkers.FATAL, msg);
2102    }
2103    this.aborted = true;
2104    close();
2105    this.closed = true;
2106  }
2107
2108  @Override
2109  public boolean isClosed() {
2110    return this.closed;
2111  }
2112
2113  @Override
2114  public boolean isAborted() {
2115    return this.aborted;
2116  }
2117
2118  @Override
2119  public void close() {
2120    TraceUtil.trace(() -> {
2121      if (this.closed) {
2122        return;
2123      }
2124      closeMaster();
2125      shutdownPools();
2126      if (this.metrics != null) {
2127        this.metrics.shutdown();
2128      }
2129      this.closed = true;
2130      if (this.registry != null) {
2131        registry.close();
2132      }
2133      this.stubs.clear();
2134      if (clusterStatusListener != null) {
2135        clusterStatusListener.close();
2136      }
2137      if (rpcClient != null) {
2138        rpcClient.close();
2139      }
2140      synchronized (this) {
2141        if (choreService != null) {
2142          choreService.shutdown();
2143        }
2144      }
2145    }, this.getClass().getSimpleName() + ".close");
2146  }
2147
2148  /**
2149   * Close the connection for good. On the off chance that someone is unable to close the
2150   * connection, perhaps because it bailed out prematurely, the method below will ensure that this
2151   * instance is cleaned up. Caveat: The JVM may take an unknown amount of time to call finalize on
2152   * an unreachable object, so our hope is that every consumer cleans up after itself, like any good
2153   * citizen.
2154   */
2155  @Override
2156  protected void finalize() throws Throwable {
2157    super.finalize();
2158    close();
2159  }
2160
2161  @Override
2162  public NonceGenerator getNonceGenerator() {
2163    return nonceGenerator;
2164  }
2165
2166  @Override
2167  public TableState getTableState(TableName tableName) throws IOException {
2168    checkClosed();
2169    TableState tableState = MetaTableAccessor.getTableState(this, tableName);
2170    if (tableState == null) {
2171      throw new TableNotFoundException(tableName);
2172    }
2173    return tableState;
2174  }
2175
2176  @Override
2177  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2178    return RpcRetryingCallerFactory.instantiate(conf, this.interceptor,
2179      this.getStatisticsTracker());
2180  }
2181
2182  @Override
2183  public boolean hasCellBlockSupport() {
2184    return this.rpcClient.hasCellBlockSupport();
2185  }
2186
2187  @Override
2188  public ConnectionConfiguration getConnectionConfiguration() {
2189    return this.connectionConfig;
2190  }
2191
2192  @Override
2193  public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2194    return this.rpcCallerFactory;
2195  }
2196
2197  @Override
2198  public RpcControllerFactory getRpcControllerFactory() {
2199    return this.rpcControllerFactory;
2200  }
2201
2202  private static <T> T get(CompletableFuture<T> future) throws IOException {
2203    try {
2204      return future.get();
2205    } catch (InterruptedException e) {
2206      Thread.currentThread().interrupt();
2207      throw (IOException) new InterruptedIOException().initCause(e);
2208    } catch (ExecutionException e) {
2209      Throwable cause = e.getCause();
2210      Throwables.propagateIfPossible(cause, IOException.class);
2211      throw new IOException(cause);
2212    }
2213  }
2214
2215  @Override
2216  public String getClusterId() {
2217    try {
2218      return registry.getClusterId().get();
2219    } catch (InterruptedException | ExecutionException e) {
2220      LOG.error("Error fetching cluster ID: ", e);
2221    }
2222    return null;
2223  }
2224}