001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
022import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
023import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
026import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
027import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
028import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
029import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
030import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx;
031
032import edu.umd.cs.findbugs.annotations.Nullable;
033import java.io.Closeable;
034import java.io.IOException;
035import java.io.InterruptedIOException;
036import java.lang.reflect.UndeclaredThrowableException;
037import java.util.ArrayList;
038import java.util.Collections;
039import java.util.Date;
040import java.util.List;
041import java.util.concurrent.BlockingQueue;
042import java.util.concurrent.CompletableFuture;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.ConcurrentMap;
045import java.util.concurrent.ExecutionException;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.LinkedBlockingQueue;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicInteger;
051import java.util.concurrent.locks.ReentrantLock;
052import org.apache.hadoop.conf.Configuration;
053import org.apache.hadoop.hbase.AuthUtil;
054import org.apache.hadoop.hbase.CallQueueTooBigException;
055import org.apache.hadoop.hbase.ChoreService;
056import org.apache.hadoop.hbase.DoNotRetryIOException;
057import org.apache.hadoop.hbase.HConstants;
058import org.apache.hadoop.hbase.HRegionLocation;
059import org.apache.hadoop.hbase.MasterNotRunningException;
060import org.apache.hadoop.hbase.MetaTableAccessor;
061import org.apache.hadoop.hbase.RegionLocations;
062import org.apache.hadoop.hbase.ServerName;
063import org.apache.hadoop.hbase.TableName;
064import org.apache.hadoop.hbase.TableNotEnabledException;
065import org.apache.hadoop.hbase.TableNotFoundException;
066import org.apache.hadoop.hbase.ZooKeeperConnectionException;
067import org.apache.hadoop.hbase.client.Scan.ReadType;
068import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
069import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
070import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
071import org.apache.hadoop.hbase.exceptions.RegionMovedException;
072import org.apache.hadoop.hbase.ipc.RpcClient;
073import org.apache.hadoop.hbase.ipc.RpcClientFactory;
074import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
075import org.apache.hadoop.hbase.log.HBaseMarkers;
076import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
077import org.apache.hadoop.hbase.security.User;
078import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
079import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
129import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
130import org.apache.hadoop.hbase.util.Bytes;
131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
132import org.apache.hadoop.hbase.util.ExceptionUtil;
133import org.apache.hadoop.hbase.util.Pair;
134import org.apache.hadoop.hbase.util.ReflectionUtils;
135import org.apache.hadoop.hbase.util.Threads;
136import org.apache.hadoop.ipc.RemoteException;
137import org.apache.hadoop.security.UserGroupInformation;
138import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
139import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
140import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
141import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
142import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
143import org.apache.yetus.audience.InterfaceAudience;
144import org.apache.zookeeper.KeeperException;
145import org.slf4j.Logger;
146import org.slf4j.LoggerFactory;
147
148/**
149 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
150 * Encapsulates connection to zookeeper and regionservers.
151 */
152@edu.umd.cs.findbugs.annotations.SuppressWarnings(
153    value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
154    justification="Access to the conncurrent hash map is under a lock so should be fine.")
155@InterfaceAudience.Private
156class ConnectionImplementation implements ClusterConnection, Closeable {
157  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
158  private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
159
160  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
161
162  private final boolean hostnamesCanChange;
163  private final long pause;
164  private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
165  // The mode tells if HedgedRead, LoadBalance mode is supported.
166  // The default mode is CatalogReplicaMode.None.
167  private CatalogReplicaMode metaReplicaMode;
168  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
169
170  private final int metaReplicaCallTimeoutScanInMicroSecond;
171  private final int numTries;
172  final int rpcTimeout;
173
174  /**
175   * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
176   * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
177   */
178  private static volatile NonceGenerator nonceGenerator = null;
179  /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */
180  private static final Object nonceGeneratorCreateLock = new Object();
181
182  private final AsyncProcess asyncProcess;
183  // single tracker per connection
184  private final ServerStatisticTracker stats;
185
186  private volatile boolean closed;
187  private volatile boolean aborted;
188
189  // package protected for the tests
190  ClusterStatusListener clusterStatusListener;
191
192  private final Object metaRegionLock = new Object();
193
194  private final Object masterLock = new Object();
195
196  // thread executor shared by all Table instances created
197  // by this connection
198  private volatile ThreadPoolExecutor batchPool = null;
199  // meta thread executor shared by all Table instances created
200  // by this connection
201  private volatile ThreadPoolExecutor metaLookupPool = null;
202  private volatile boolean cleanupPool = false;
203
204  private final Configuration conf;
205
206  // cache the configuration value for tables so that we can avoid calling
207  // the expensive Configuration to fetch the value multiple times.
208  private final ConnectionConfiguration connectionConfig;
209
210  // Client rpc instance.
211  private final RpcClient rpcClient;
212
213  private final MetaCache metaCache;
214  private final MetricsConnection metrics;
215
216  protected User user;
217
218  private final RpcRetryingCallerFactory rpcCallerFactory;
219
220  private final RpcControllerFactory rpcControllerFactory;
221
222  private final RetryingCallerInterceptor interceptor;
223
224  /**
225   * Cluster registry of basic info such as clusterid and meta region location.
226   */
227  private final ConnectionRegistry registry;
228
229  private final ClientBackoffPolicy backoffPolicy;
230
231  /**
232   * Allow setting an alternate BufferedMutator implementation via
233   * config. If null, use default.
234   */
235  private final String alternateBufferedMutatorClassName;
236
237  /** lock guards against multiple threads trying to query the meta region at the same time */
238  private final ReentrantLock userRegionLock = new ReentrantLock();
239
240  private ChoreService choreService;
241
242  /**
243   * constructor
244   * @param conf Configuration object
245   */
246  ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException {
247    this.conf = conf;
248    this.user = user;
249    if (user != null && user.isLoginFromKeytab()) {
250      spawnRenewalChore(user.getUGI());
251    }
252    this.batchPool = (ThreadPoolExecutor) pool;
253    this.connectionConfig = new ConnectionConfiguration(conf);
254    this.closed = false;
255    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
256        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
257    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
258    if (configuredPauseForCQTBE < pause) {
259      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
260          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
261          + ", will use " + pause + " instead.");
262      this.pauseForCQTBE = pause;
263    } else {
264      this.pauseForCQTBE = configuredPauseForCQTBE;
265    }
266    this.metaReplicaCallTimeoutScanInMicroSecond =
267        connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
268
269    // how many times to try, one more than max *retry* time
270    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
271    this.rpcTimeout = conf.getInt(
272        HConstants.HBASE_RPC_TIMEOUT_KEY,
273        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
274    if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
275      synchronized (nonceGeneratorCreateLock) {
276        if (nonceGenerator == null) {
277          nonceGenerator = PerClientRandomNonceGenerator.get();
278        }
279      }
280    } else {
281      nonceGenerator = NO_NONCE_GENERATOR;
282    }
283
284    this.stats = ServerStatisticTracker.create(conf);
285    this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
286    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
287    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
288    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
289    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
290    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
291      this.metrics =
292        new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool);
293    } else {
294      this.metrics = null;
295    }
296    this.metaCache = new MetaCache(this.metrics);
297
298    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
299        HConstants.STATUS_PUBLISHED_DEFAULT);
300    this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
301    Class<? extends ClusterStatusListener.Listener> listenerClass =
302        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
303            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
304            ClusterStatusListener.Listener.class);
305
306    // Is there an alternate BufferedMutator to use?
307    this.alternateBufferedMutatorClassName =
308        this.conf.get(BufferedMutator.CLASSNAME_KEY);
309
310    try {
311      this.registry = ConnectionRegistryFactory.getRegistry(conf);
312      retrieveClusterId();
313
314      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
315
316      // Do we publish the status?
317      if (shouldListen) {
318        if (listenerClass == null) {
319          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
320              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
321        } else {
322          clusterStatusListener = new ClusterStatusListener(
323              new ClusterStatusListener.DeadServerHandler() {
324                @Override
325                public void newDead(ServerName sn) {
326                  clearCaches(sn);
327                  rpcClient.cancelConnections(sn);
328                }
329              }, conf, listenerClass);
330        }
331      }
332    } catch (Throwable e) {
333      // avoid leaks: registry, rpcClient, ...
334      LOG.debug("connection construction failed", e);
335      close();
336      throw e;
337    }
338
339    // Get the region locator's meta replica mode.
340    this.metaReplicaMode = CatalogReplicaMode.fromString(conf.get(LOCATOR_META_REPLICAS_MODE,
341      CatalogReplicaMode.NONE.toString()));
342
343    switch (this.metaReplicaMode) {
344      case LOAD_BALANCE:
345        String replicaSelectorClass = conf.get(
346          RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
347          CatalogReplicaLoadBalanceSimpleSelector.class.getName());
348
349        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
350          replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> {
351            int numOfReplicas = 1;
352            try {
353              RegionLocations metaLocations = registry.getMetaRegionLocations().get(
354                connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
355              numOfReplicas = metaLocations.size();
356            } catch (Exception e) {
357              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
358            }
359            return numOfReplicas;
360          });
361        break;
362      case NONE:
363        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
364
365        boolean useMetaReplicas = conf.getBoolean(USE_META_REPLICAS,
366          DEFAULT_USE_META_REPLICAS);
367        if (useMetaReplicas) {
368          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
369        }
370        break;
371      default:
372        // Doing nothing
373    }
374  }
375
376  private void spawnRenewalChore(final UserGroupInformation user) {
377    ChoreService service = getChoreService();
378    service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
379  }
380
381  /**
382   * @param conn The connection for which to replace the generator.
383   * @param cnm Replaces the nonce generator used, for testing.
384   * @return old nonce generator.
385   */
386  static NonceGenerator injectNonceGeneratorForTesting(
387      ClusterConnection conn, NonceGenerator cnm) {
388    ConnectionImplementation connImpl = (ConnectionImplementation)conn;
389    NonceGenerator ng = connImpl.getNonceGenerator();
390    LOG.warn("Nonce generator is being replaced by test code for "
391      + cnm.getClass().getName());
392    nonceGenerator = cnm;
393    return ng;
394  }
395
396  @Override
397  public Table getTable(TableName tableName) throws IOException {
398    return getTable(tableName, getBatchPool());
399  }
400
401  @Override
402  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
403    return new TableBuilderBase(tableName, connectionConfig) {
404
405      @Override
406      public Table build() {
407        return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
408            rpcControllerFactory, pool);
409      }
410    };
411  }
412
413  @Override
414  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
415    if (params.getTableName() == null) {
416      throw new IllegalArgumentException("TableName cannot be null.");
417    }
418    if (params.getPool() == null) {
419      params.pool(HTable.getDefaultExecutor(getConfiguration()));
420    }
421    if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
422      params.writeBufferSize(connectionConfig.getWriteBufferSize());
423    }
424    if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
425      params.setWriteBufferPeriodicFlushTimeoutMs(
426              connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
427    }
428    if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
429      params.setWriteBufferPeriodicFlushTimerTickMs(
430              connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
431    }
432    if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
433      params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
434    }
435    // Look to see if an alternate BufferedMutation implementation is wanted.
436    // Look in params and in config. If null, use default.
437    String implementationClassName = params.getImplementationClassName();
438    if (implementationClassName == null) {
439      implementationClassName = this.alternateBufferedMutatorClassName;
440    }
441    if (implementationClassName == null) {
442      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
443    }
444    try {
445      return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
446          this, rpcCallerFactory, rpcControllerFactory, params);
447    } catch (ClassNotFoundException e) {
448      throw new RuntimeException(e);
449    }
450  }
451
452  @Override
453  public BufferedMutator getBufferedMutator(TableName tableName) {
454    return getBufferedMutator(new BufferedMutatorParams(tableName));
455  }
456
457  @Override
458  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
459    return new HRegionLocator(tableName, this);
460  }
461
462  @Override
463  public Admin getAdmin() throws IOException {
464    return new HBaseAdmin(this);
465  }
466
467  @Override
468  public Hbck getHbck() throws IOException {
469    return getHbck(get(registry.getActiveMaster()));
470  }
471
472  @Override
473  public Hbck getHbck(ServerName masterServer) throws IOException {
474    checkClosed();
475    if (isDeadServer(masterServer)) {
476      throw new RegionServerStoppedException(masterServer + " is dead.");
477    }
478    String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
479      masterServer, this.hostnamesCanChange);
480
481    return new HBaseHbck(
482      (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
483        BlockingRpcChannel channel =
484          this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
485        return MasterProtos.HbckService.newBlockingStub(channel);
486      }), rpcControllerFactory);
487  }
488
489  @Override
490  public MetricsConnection getConnectionMetrics() {
491    return this.metrics;
492  }
493
494  private ThreadPoolExecutor getBatchPool() {
495    if (batchPool == null) {
496      synchronized (this) {
497        if (batchPool == null) {
498          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
499          this.batchPool = getThreadPool(threads, threads, "-shared", null);
500          this.cleanupPool = true;
501        }
502      }
503    }
504    return this.batchPool;
505  }
506
507  private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint,
508      BlockingQueue<Runnable> passedWorkQueue) {
509    // shared HTable thread executor not yet initialized
510    if (maxThreads == 0) {
511      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
512    }
513    if (coreThreads == 0) {
514      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
515    }
516    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
517    BlockingQueue<Runnable> workQueue = passedWorkQueue;
518    if (workQueue == null) {
519      workQueue =
520        new LinkedBlockingQueue<>(maxThreads *
521            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
522                HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
523      coreThreads = maxThreads;
524    }
525    ThreadPoolExecutor tpe =
526      new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
527        new ThreadFactoryBuilder().setNameFormat(toString() + nameHint + "-pool-%d")
528          .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
529    tpe.allowCoreThreadTimeOut(true);
530    return tpe;
531  }
532
533  private ThreadPoolExecutor getMetaLookupPool() {
534    if (this.metaLookupPool == null) {
535      synchronized (this) {
536        if (this.metaLookupPool == null) {
537          //Some of the threads would be used for meta replicas
538          //To start with, threads.max.core threads can hit the meta (including replicas).
539          //After that, requests will get queued up in the passed queue, and only after
540          //the queue is full, a new thread will be started
541          int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
542          this.metaLookupPool = getThreadPool(
543             threads,
544             threads,
545             "-metaLookup-shared-", new LinkedBlockingQueue<>());
546        }
547      }
548    }
549    return this.metaLookupPool;
550  }
551
552  protected ExecutorService getCurrentMetaLookupPool() {
553    return metaLookupPool;
554  }
555
556  protected ExecutorService getCurrentBatchPool() {
557    return batchPool;
558  }
559
560  private void shutdownPools() {
561    if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
562      shutdownBatchPool(this.batchPool);
563    }
564    if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
565      shutdownBatchPool(this.metaLookupPool);
566    }
567  }
568
569  private void shutdownBatchPool(ExecutorService pool) {
570    pool.shutdown();
571    try {
572      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
573        pool.shutdownNow();
574      }
575    } catch (InterruptedException e) {
576      pool.shutdownNow();
577    }
578  }
579
580  /**
581   * For tests only.
582   */
583  RpcClient getRpcClient() {
584    return rpcClient;
585  }
586
587  /**
588   * An identifier that will remain the same for a given connection.
589   */
590  @Override
591  public String toString(){
592    return "hconnection-0x" + Integer.toHexString(hashCode());
593  }
594
595  protected String clusterId = null;
596
597  protected void retrieveClusterId() {
598    if (clusterId != null) {
599      return;
600    }
601    try {
602      this.clusterId = this.registry.getClusterId().get();
603    } catch (InterruptedException | ExecutionException e) {
604      LOG.warn("Retrieve cluster id failed", e);
605    }
606    if (clusterId == null) {
607      clusterId = HConstants.CLUSTER_ID_DEFAULT;
608      LOG.debug("clusterid came back null, using default " + clusterId);
609    }
610  }
611
612  /**
613   * If choreService has not been created yet, create the ChoreService.
614   * @return ChoreService
615   */
616  synchronized ChoreService getChoreService() {
617    if (choreService == null) {
618      choreService = new ChoreService("AsyncConn Chore Service");
619    }
620    return choreService;
621  }
622
623  @Override
624  public Configuration getConfiguration() {
625    return this.conf;
626  }
627
628  private void checkClosed() throws DoNotRetryIOException {
629    if (this.closed) {
630      throw new DoNotRetryIOException(toString() + " closed");
631    }
632  }
633
634  /**
635   * @return true if the master is running, throws an exception otherwise
636   * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
637   * @deprecated this has been deprecated without a replacement
638   */
639  @Deprecated
640  @Override
641  public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
642    // When getting the master connection, we check it's running,
643    // so if there is no exception, it means we've been able to get a
644    // connection on a running master
645    MasterKeepAliveConnection m;
646    try {
647      m = getKeepAliveMasterService();
648    } catch (IOException e) {
649      throw new MasterNotRunningException(e);
650    }
651    m.close();
652    return true;
653  }
654
655  @Override
656  public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
657      boolean reload) throws IOException {
658    return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
659  }
660
661
662  @Override
663  public boolean isTableEnabled(TableName tableName) throws IOException {
664    return getTableState(tableName).inStates(TableState.State.ENABLED);
665  }
666
667  @Override
668  public boolean isTableDisabled(TableName tableName) throws IOException {
669    return getTableState(tableName).inStates(TableState.State.DISABLED);
670  }
671
672  @Override
673  public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
674      throws IOException {
675    checkClosed();
676    try {
677      if (!isTableEnabled(tableName)) {
678        LOG.debug("Table {} not enabled", tableName);
679        return false;
680      }
681      if (TableName.isMetaTableName(tableName)) {
682        // meta table is always available
683        return true;
684      }
685      List<Pair<RegionInfo, ServerName>> locations =
686        MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true);
687
688      int notDeployed = 0;
689      int regionCount = 0;
690      for (Pair<RegionInfo, ServerName> pair : locations) {
691        RegionInfo info = pair.getFirst();
692        if (pair.getSecond() == null) {
693          LOG.debug("Table {} has not deployed region {}", tableName,
694              pair.getFirst().getEncodedName());
695          notDeployed++;
696        } else if (splitKeys != null
697            && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
698          for (byte[] splitKey : splitKeys) {
699            // Just check if the splitkey is available
700            if (Bytes.equals(info.getStartKey(), splitKey)) {
701              regionCount++;
702              break;
703            }
704          }
705        } else {
706          // Always empty start row should be counted
707          regionCount++;
708        }
709      }
710      if (notDeployed > 0) {
711        if (LOG.isDebugEnabled()) {
712          LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed);
713        }
714        return false;
715      } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
716        if (LOG.isDebugEnabled()) {
717          LOG.debug("Table {} expected to have {} regions, but only {} available", tableName,
718              splitKeys.length + 1, regionCount);
719        }
720        return false;
721      } else {
722        LOG.trace("Table {} should be available", tableName);
723        return true;
724      }
725    } catch (TableNotFoundException tnfe) {
726      LOG.warn("Table {} does not exist", tableName);
727      return false;
728    }
729  }
730
731  @Override
732  public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
733    RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
734      RegionInfo.getStartKey(regionName), false, true);
735    return locations == null ? null : locations.getRegionLocation();
736  }
737
738  private boolean isDeadServer(ServerName sn) {
739    if (clusterStatusListener == null) {
740      return false;
741    } else {
742      return clusterStatusListener.isDeadServer(sn);
743    }
744  }
745
746  @Override
747  public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
748    return locateRegions(tableName, false, true);
749  }
750
751  @Override
752  public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
753      boolean offlined) throws IOException {
754    List<RegionInfo> regions;
755    if (TableName.isMetaTableName(tableName)) {
756      regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO);
757    } else {
758      regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined);
759    }
760    List<HRegionLocation> locations = new ArrayList<>();
761    for (RegionInfo regionInfo : regions) {
762      if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) {
763        continue;
764      }
765      RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
766      if (list != null) {
767        for (HRegionLocation loc : list.getRegionLocations()) {
768          if (loc != null) {
769            locations.add(loc);
770          }
771        }
772      }
773    }
774    return locations;
775  }
776
777  @Override
778  public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
779      throws IOException {
780    RegionLocations locations = locateRegion(tableName, row, true, true);
781    return locations == null ? null : locations.getRegionLocation();
782  }
783
784  @Override
785  public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
786      throws IOException {
787    RegionLocations locations =
788      relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
789    return locations == null ? null
790      : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
791  }
792
793  @Override
794  public RegionLocations relocateRegion(final TableName tableName,
795      final byte [] row, int replicaId) throws IOException{
796    // Since this is an explicit request not to use any caching, finding
797    // disabled tables should not be desirable.  This will ensure that an exception is thrown when
798    // the first time a disabled table is interacted with.
799    if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
800      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
801    }
802
803    return locateRegion(tableName, row, false, true, replicaId);
804  }
805
806  @Override
807  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
808      boolean retry) throws IOException {
809    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
810  }
811
812  @Override
813  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
814      boolean retry, int replicaId) throws IOException {
815    checkClosed();
816    if (tableName == null || tableName.getName().length == 0) {
817      throw new IllegalArgumentException("table name cannot be null or zero length");
818    }
819    if (tableName.equals(TableName.META_TABLE_NAME)) {
820      return locateMeta(tableName, useCache, replicaId);
821    } else {
822      // Region not in the cache - have to go to the meta RS
823      return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
824    }
825  }
826
827  private RegionLocations locateMeta(final TableName tableName,
828      boolean useCache, int replicaId) throws IOException {
829    // HBASE-10785: We cache the location of the META itself, so that we are not overloading
830    // zookeeper with one request for every region lookup. We cache the META with empty row
831    // key in MetaCache.
832    byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
833    RegionLocations locations = null;
834    if (useCache) {
835      locations = getCachedLocation(tableName, metaCacheKey);
836      if (locations != null && locations.getRegionLocation(replicaId) != null) {
837        return locations;
838      }
839    }
840
841    // only one thread should do the lookup.
842    synchronized (metaRegionLock) {
843      // Check the cache again for a hit in case some other thread made the
844      // same query while we were waiting on the lock.
845      if (useCache) {
846        locations = getCachedLocation(tableName, metaCacheKey);
847        if (locations != null && locations.getRegionLocation(replicaId) != null) {
848          return locations;
849        }
850      }
851
852      // Look up from zookeeper
853      locations = get(this.registry.getMetaRegionLocations());
854      if (locations != null) {
855        cacheLocation(tableName, locations);
856      }
857    }
858    return locations;
859  }
860
861  /**
862   * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're
863   * seeking.
864   */
865  private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache,
866      boolean retry, int replicaId) throws IOException {
867    // If we are supposed to be using the cache, look in the cache to see if we already have the
868    // region.
869    if (useCache) {
870      RegionLocations locations = getCachedLocation(tableName, row);
871      if (locations != null && locations.getRegionLocation(replicaId) != null) {
872        return locations;
873      }
874    }
875    // build the key of the meta region we should be looking for.
876    // the extra 9's on the end are necessary to allow "exact" matches
877    // without knowing the precise region names.
878    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
879    byte[] metaStopKey =
880      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
881    Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
882      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
883      .setReadType(ReadType.PREAD);
884
885    switch (this.metaReplicaMode) {
886      case LOAD_BALANCE:
887        int metaReplicaId = this.metaReplicaSelector.select(tableName, row,
888          RegionLocateType.CURRENT);
889        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
890          // If the selector gives a non-primary meta replica region, then go with it.
891          // Otherwise, just go to primary in non-hedgedRead mode.
892          s.setConsistency(Consistency.TIMELINE);
893          s.setReplicaId(metaReplicaId);
894        }
895        break;
896      case HEDGED_READ:
897        s.setConsistency(Consistency.TIMELINE);
898        break;
899      default:
900        // do nothing
901    }
902    int maxAttempts = (retry ? numTries : 1);
903    boolean relocateMeta = false;
904    for (int tries = 0; ; tries++) {
905      if (tries >= maxAttempts) {
906        throw new NoServerForRegionException("Unable to find region for "
907            + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
908      }
909      if (useCache) {
910        RegionLocations locations = getCachedLocation(tableName, row);
911        if (locations != null && locations.getRegionLocation(replicaId) != null) {
912          return locations;
913        }
914      } else {
915        // If we are not supposed to be using the cache, delete any existing cached location
916        // so it won't interfere.
917        // We are only supposed to clean the cache for the specific replicaId
918        metaCache.clearCache(tableName, row, replicaId);
919      }
920      // Query the meta region
921      long pauseBase = this.pause;
922      takeUserRegionLock();
923      try {
924        // We don't need to check if useCache is enabled or not. Even if useCache is false
925        // we already cleared the cache for this row before acquiring userRegion lock so if this
926        // row is present in cache that means some other thread has populated it while we were
927        // waiting to acquire user region lock.
928        RegionLocations locations = getCachedLocation(tableName, row);
929        if (locations != null && locations.getRegionLocation(replicaId) != null) {
930          return locations;
931        }
932        if (relocateMeta) {
933          relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
934            RegionInfo.DEFAULT_REPLICA_ID);
935        }
936        s.resetMvccReadPoint();
937        try (ReversedClientScanner rcs =
938          new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
939            rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
940          boolean tableNotFound = true;
941          for (;;) {
942            Result regionInfoRow = rcs.next();
943            if (regionInfoRow == null) {
944              if (tableNotFound) {
945                throw new TableNotFoundException(tableName);
946              } else {
947                throw new IOException(
948                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
949              }
950            }
951            tableNotFound = false;
952            // convert the row result into the HRegionLocation we need!
953            locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
954            if (locations == null || locations.getRegionLocation(replicaId) == null) {
955              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
956            }
957            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
958            if (regionInfo == null) {
959              throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME +
960                ", row=" + regionInfoRow);
961            }
962            // See HBASE-20182. It is possible that we locate to a split parent even after the
963            // children are online, so here we need to skip this region and go to the next one.
964            if (regionInfo.isSplitParent()) {
965              continue;
966            }
967            if (regionInfo.isOffline()) {
968              throw new RegionOfflineException("Region offline; disable table call? " +
969                  regionInfo.getRegionNameAsString());
970            }
971            // It is possible that the split children have not been online yet and we have skipped
972            // the parent in the above condition, so we may have already reached a region which does
973            // not contains us.
974            if (!regionInfo.containsRow(row)) {
975              throw new IOException(
976                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
977            }
978            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
979            if (serverName == null) {
980              throw new NoServerForRegionException("No server address listed in " +
981                TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
982                " containing row " + Bytes.toStringBinary(row));
983            }
984            if (isDeadServer(serverName)) {
985              throw new RegionServerStoppedException(
986                "hbase:meta says the region " + regionInfo.getRegionNameAsString() +
987                  " is managed by the server " + serverName + ", but it is dead.");
988            }
989            // Instantiate the location
990            cacheLocation(tableName, locations);
991            return locations;
992          }
993        }
994      } catch (TableNotFoundException e) {
995        // if we got this error, probably means the table just plain doesn't
996        // exist. rethrow the error immediately. this should always be coming
997        // from the HTable constructor.
998        throw e;
999      } catch (IOException e) {
1000        ExceptionUtil.rethrowIfInterrupt(e);
1001        if (e instanceof RemoteException) {
1002          e = ((RemoteException)e).unwrapRemoteException();
1003        }
1004        if (e instanceof CallQueueTooBigException) {
1005          // Give a special check on CallQueueTooBigException, see #HBASE-17114
1006          pauseBase = this.pauseForCQTBE;
1007        }
1008        if (tries < maxAttempts - 1) {
1009          LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " +
1010            "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
1011        } else {
1012          throw e;
1013        }
1014        // Only relocate the parent region if necessary
1015        relocateMeta =
1016          !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
1017      } finally {
1018        userRegionLock.unlock();
1019      }
1020      try{
1021        Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
1022      } catch (InterruptedException e) {
1023        throw new InterruptedIOException("Giving up trying to location region in " +
1024          "meta: thread is interrupted.");
1025      }
1026    }
1027  }
1028
1029  void takeUserRegionLock() throws IOException {
1030    try {
1031      long waitTime = connectionConfig.getMetaOperationTimeout();
1032      if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
1033        throw new LockTimeoutException("Failed to get user region lock in"
1034            + waitTime + " ms. " + " for accessing meta region server.");
1035      }
1036    } catch (InterruptedException ie) {
1037      LOG.error("Interrupted while waiting for a lock", ie);
1038      throw ExceptionUtil.asInterrupt(ie);
1039    }
1040  }
1041
1042  /**
1043   * Put a newly discovered HRegionLocation into the cache.
1044   * @param tableName The table name.
1045   * @param location the new location
1046   */
1047  @Override
1048  public void cacheLocation(final TableName tableName, final RegionLocations location) {
1049    metaCache.cacheLocation(tableName, location);
1050  }
1051
1052  /**
1053   * Search the cache for a location that fits our table and row key.
1054   * Return null if no suitable region is located.
1055   * @return Null or region location found in cache.
1056   */
1057  RegionLocations getCachedLocation(final TableName tableName,
1058      final byte [] row) {
1059    return metaCache.getCachedLocation(tableName, row);
1060  }
1061
1062  public void clearRegionCache(final TableName tableName, byte[] row) {
1063    metaCache.clearCache(tableName, row);
1064  }
1065
1066  /*
1067   * Delete all cached entries of a table that maps to a specific location.
1068   */
1069  @Override
1070  public void clearCaches(final ServerName serverName) {
1071    metaCache.clearCache(serverName);
1072  }
1073
1074  @Override
1075  public void clearRegionLocationCache() {
1076    metaCache.clearCache();
1077  }
1078
1079  @Override
1080  public void clearRegionCache(final TableName tableName) {
1081    metaCache.clearCache(tableName);
1082  }
1083
1084  /**
1085   * Put a newly discovered HRegionLocation into the cache.
1086   * @param tableName The table name.
1087   * @param source the source of the new location, if it's not coming from meta
1088   * @param location the new location
1089   */
1090  private void cacheLocation(final TableName tableName, final ServerName source,
1091      final HRegionLocation location) {
1092    metaCache.cacheLocation(tableName, source, location);
1093  }
1094
1095  // Map keyed by service name + regionserver to service stub implementation
1096  private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>();
1097
1098  /**
1099   * State of the MasterService connection/setup.
1100   */
1101  static class MasterServiceState {
1102    Connection connection;
1103
1104    MasterProtos.MasterService.BlockingInterface stub;
1105    int userCount;
1106
1107    MasterServiceState(final Connection connection) {
1108      super();
1109      this.connection = connection;
1110    }
1111
1112    @Override
1113    public String toString() {
1114      return "MasterService";
1115    }
1116
1117    Object getStub() {
1118      return this.stub;
1119    }
1120
1121    void clearStub() {
1122      this.stub = null;
1123    }
1124
1125    boolean isMasterRunning() throws IOException {
1126      MasterProtos.IsMasterRunningResponse response = null;
1127      try {
1128        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1129      } catch (Exception e) {
1130        throw ProtobufUtil.handleRemoteException(e);
1131      }
1132      return response != null? response.getIsMasterRunning(): false;
1133    }
1134  }
1135
1136  /**
1137   * The record of errors for servers.
1138   */
1139  static class ServerErrorTracker {
1140    // We need a concurrent map here, as we could have multiple threads updating it in parallel.
1141    private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>();
1142    private final long canRetryUntil;
1143    private final int maxTries;// max number to try
1144    private final long startTrackingTime;
1145
1146    /**
1147     * Constructor
1148     * @param timeout how long to wait before timeout, in unit of millisecond
1149     * @param maxTries how many times to try
1150     */
1151    public ServerErrorTracker(long timeout, int maxTries) {
1152      this.maxTries = maxTries;
1153      this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
1154      this.startTrackingTime = new Date().getTime();
1155    }
1156
1157    /**
1158     * We stop to retry when we have exhausted BOTH the number of tries and the time allocated.
1159     * @param numAttempt how many times we have tried by now
1160     */
1161    boolean canTryMore(int numAttempt) {
1162      // If there is a single try we must not take into account the time.
1163      return numAttempt < maxTries || (maxTries > 1 &&
1164          EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
1165    }
1166
1167    /**
1168     * Calculates the back-off time for a retrying request to a particular server.
1169     *
1170     * @param server    The server in question.
1171     * @param basePause The default hci pause.
1172     * @return The time to wait before sending next request.
1173     */
1174    long calculateBackoffTime(ServerName server, long basePause) {
1175      long result;
1176      ServerErrors errorStats = errorsByServer.get(server);
1177      if (errorStats != null) {
1178        result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1));
1179      } else {
1180        result = 0; // yes, if the server is not in our list we don't wait before retrying.
1181      }
1182      return result;
1183    }
1184
1185    /**
1186     * Reports that there was an error on the server to do whatever bean-counting necessary.
1187     * @param server The server in question.
1188     */
1189    void reportServerError(ServerName server) {
1190      computeIfAbsent(errorsByServer, server, ServerErrors::new).addError();
1191    }
1192
1193    long getStartTrackingTime() {
1194      return startTrackingTime;
1195    }
1196
1197    /**
1198     * The record of errors for a server.
1199     */
1200    private static class ServerErrors {
1201      private final AtomicInteger retries = new AtomicInteger(0);
1202
1203      public int getCount() {
1204        return retries.get();
1205      }
1206
1207      public void addError() {
1208        retries.incrementAndGet();
1209      }
1210    }
1211  }
1212
1213  /**
1214   * Class to make a MasterServiceStubMaker stub.
1215   */
1216  private final class MasterServiceStubMaker {
1217
1218    private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
1219        throws IOException {
1220      try {
1221        stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1222      } catch (ServiceException e) {
1223        throw ProtobufUtil.handleRemoteException(e);
1224      }
1225    }
1226
1227    /**
1228     * Create a stub. Try once only. It is not typed because there is no common type to protobuf
1229     * services nor their interfaces. Let the caller do appropriate casting.
1230     * @return A stub for master services.
1231     */
1232    private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
1233        throws IOException, KeeperException {
1234      ServerName sn = get(registry.getActiveMaster());
1235      if (sn == null) {
1236        String msg = "ZooKeeper available but no active master location found";
1237        LOG.info(msg);
1238        throw new MasterNotRunningException(msg);
1239      }
1240      if (isDeadServer(sn)) {
1241        throw new MasterNotRunningException(sn + " is dead.");
1242      }
1243      // Use the security info interface name as our stub key
1244      String key =
1245          getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange);
1246      MasterProtos.MasterService.BlockingInterface stub =
1247          (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1248            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1249            return MasterProtos.MasterService.newBlockingStub(channel);
1250          });
1251      isMasterRunning(stub);
1252      return stub;
1253    }
1254
1255    /**
1256     * Create a stub against the master. Retry if necessary.
1257     * @return A stub to do <code>intf</code> against the master
1258     * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
1259     */
1260    MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
1261      // The lock must be at the beginning to prevent multiple master creations
1262      // (and leaks) in a multithread context
1263      synchronized (masterLock) {
1264        Exception exceptionCaught = null;
1265        if (!closed) {
1266          try {
1267            return makeStubNoRetries();
1268          } catch (IOException e) {
1269            exceptionCaught = e;
1270          } catch (KeeperException e) {
1271            exceptionCaught = e;
1272          }
1273          throw new MasterNotRunningException(exceptionCaught);
1274        } else {
1275          throw new DoNotRetryIOException("Connection was closed while trying to get master");
1276        }
1277      }
1278    }
1279  }
1280
1281  @Override
1282  public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
1283    return getAdmin(get(registry.getActiveMaster()));
1284  }
1285
1286  @Override
1287  public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
1288      throws IOException {
1289    checkClosed();
1290    if (isDeadServer(serverName)) {
1291      throw new RegionServerStoppedException(serverName + " is dead.");
1292    }
1293    String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
1294      this.hostnamesCanChange);
1295    return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1296      BlockingRpcChannel channel =
1297          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1298      return AdminProtos.AdminService.newBlockingStub(channel);
1299    });
1300  }
1301
1302  @Override
1303  public BlockingInterface getClient(ServerName serverName) throws IOException {
1304    checkClosed();
1305    if (isDeadServer(serverName)) {
1306      throw new RegionServerStoppedException(serverName + " is dead.");
1307    }
1308    String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(),
1309      serverName, this.hostnamesCanChange);
1310    return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1311      BlockingRpcChannel channel =
1312          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1313      return ClientProtos.ClientService.newBlockingStub(channel);
1314    });
1315  }
1316
1317  final MasterServiceState masterServiceState = new MasterServiceState(this);
1318
1319  @Override
1320  public MasterKeepAliveConnection getMaster() throws IOException {
1321    return getKeepAliveMasterService();
1322  }
1323
1324  private void resetMasterServiceState(final MasterServiceState mss) {
1325    mss.userCount++;
1326  }
1327
1328  private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
1329    synchronized (masterLock) {
1330      if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1331        MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1332        this.masterServiceState.stub = stubMaker.makeStub();
1333      }
1334      resetMasterServiceState(this.masterServiceState);
1335    }
1336    // Ugly delegation just so we can add in a Close method.
1337    final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
1338    return new MasterKeepAliveConnection() {
1339      MasterServiceState mss = masterServiceState;
1340
1341      @Override
1342      public MasterProtos.AbortProcedureResponse abortProcedure(
1343          RpcController controller,
1344          MasterProtos.AbortProcedureRequest request) throws ServiceException {
1345        return stub.abortProcedure(controller, request);
1346      }
1347
1348      @Override
1349      public MasterProtos.GetProceduresResponse getProcedures(
1350          RpcController controller,
1351          MasterProtos.GetProceduresRequest request) throws ServiceException {
1352        return stub.getProcedures(controller, request);
1353      }
1354
1355      @Override
1356      public MasterProtos.GetLocksResponse getLocks(
1357          RpcController controller,
1358          MasterProtos.GetLocksRequest request) throws ServiceException {
1359        return stub.getLocks(controller, request);
1360      }
1361
1362      @Override
1363      public MasterProtos.AddColumnResponse addColumn(
1364          RpcController controller,
1365          MasterProtos.AddColumnRequest request) throws ServiceException {
1366        return stub.addColumn(controller, request);
1367      }
1368
1369      @Override
1370      public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller,
1371          MasterProtos.DeleteColumnRequest request)
1372      throws ServiceException {
1373        return stub.deleteColumn(controller, request);
1374      }
1375
1376      @Override
1377      public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller,
1378          MasterProtos.ModifyColumnRequest request)
1379      throws ServiceException {
1380        return stub.modifyColumn(controller, request);
1381      }
1382
1383      @Override
1384      public MasterProtos.MoveRegionResponse moveRegion(RpcController controller,
1385          MasterProtos.MoveRegionRequest request) throws ServiceException {
1386        return stub.moveRegion(controller, request);
1387      }
1388
1389      @Override
1390      public MasterProtos.MergeTableRegionsResponse mergeTableRegions(
1391          RpcController controller, MasterProtos.MergeTableRegionsRequest request)
1392          throws ServiceException {
1393        return stub.mergeTableRegions(controller, request);
1394      }
1395
1396      @Override
1397      public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
1398          MasterProtos.AssignRegionRequest request) throws ServiceException {
1399        return stub.assignRegion(controller, request);
1400      }
1401
1402      @Override
1403      public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller,
1404          MasterProtos.UnassignRegionRequest request) throws ServiceException {
1405        return stub.unassignRegion(controller, request);
1406      }
1407
1408      @Override
1409      public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller,
1410          MasterProtos.OfflineRegionRequest request) throws ServiceException {
1411        return stub.offlineRegion(controller, request);
1412      }
1413
1414      @Override
1415      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
1416          MasterProtos.SplitTableRegionRequest request) throws ServiceException {
1417        return stub.splitRegion(controller, request);
1418      }
1419
1420      @Override
1421      public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
1422          MasterProtos.DeleteTableRequest request) throws ServiceException {
1423        return stub.deleteTable(controller, request);
1424      }
1425
1426      @Override
1427      public MasterProtos.TruncateTableResponse truncateTable(RpcController controller,
1428          MasterProtos.TruncateTableRequest request) throws ServiceException {
1429        return stub.truncateTable(controller, request);
1430      }
1431
1432      @Override
1433      public MasterProtos.EnableTableResponse enableTable(RpcController controller,
1434          MasterProtos.EnableTableRequest request) throws ServiceException {
1435        return stub.enableTable(controller, request);
1436      }
1437
1438      @Override
1439      public MasterProtos.DisableTableResponse disableTable(RpcController controller,
1440          MasterProtos.DisableTableRequest request) throws ServiceException {
1441        return stub.disableTable(controller, request);
1442      }
1443
1444      @Override
1445      public MasterProtos.ModifyTableResponse modifyTable(RpcController controller,
1446          MasterProtos.ModifyTableRequest request) throws ServiceException {
1447        return stub.modifyTable(controller, request);
1448      }
1449
1450      @Override
1451      public MasterProtos.CreateTableResponse createTable(RpcController controller,
1452          MasterProtos.CreateTableRequest request) throws ServiceException {
1453        return stub.createTable(controller, request);
1454      }
1455
1456      @Override
1457      public MasterProtos.ShutdownResponse shutdown(RpcController controller,
1458          MasterProtos.ShutdownRequest request) throws ServiceException {
1459        return stub.shutdown(controller, request);
1460      }
1461
1462      @Override
1463      public MasterProtos.StopMasterResponse stopMaster(RpcController controller,
1464          MasterProtos.StopMasterRequest request) throws ServiceException {
1465        return stub.stopMaster(controller, request);
1466      }
1467
1468      @Override
1469      public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1470          final RpcController controller,
1471          final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException {
1472        return stub.isMasterInMaintenanceMode(controller, request);
1473      }
1474
1475      @Override
1476      public MasterProtos.BalanceResponse balance(RpcController controller,
1477          MasterProtos.BalanceRequest request) throws ServiceException {
1478        return stub.balance(controller, request);
1479      }
1480
1481      @Override
1482      public MasterProtos.SetBalancerRunningResponse setBalancerRunning(
1483          RpcController controller, MasterProtos.SetBalancerRunningRequest request)
1484          throws ServiceException {
1485        return stub.setBalancerRunning(controller, request);
1486      }
1487
1488      @Override
1489      public NormalizeResponse normalize(RpcController controller,
1490          NormalizeRequest request) throws ServiceException {
1491        return stub.normalize(controller, request);
1492      }
1493
1494      @Override
1495      public SetNormalizerRunningResponse setNormalizerRunning(
1496          RpcController controller, SetNormalizerRunningRequest request)
1497          throws ServiceException {
1498        return stub.setNormalizerRunning(controller, request);
1499      }
1500
1501      @Override
1502      public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller,
1503          MasterProtos.RunCatalogScanRequest request) throws ServiceException {
1504        return stub.runCatalogScan(controller, request);
1505      }
1506
1507      @Override
1508      public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor(
1509          RpcController controller, MasterProtos.EnableCatalogJanitorRequest request)
1510          throws ServiceException {
1511        return stub.enableCatalogJanitor(controller, request);
1512      }
1513
1514      @Override
1515      public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1516          RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request)
1517          throws ServiceException {
1518        return stub.isCatalogJanitorEnabled(controller, request);
1519      }
1520
1521      @Override
1522      public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller,
1523          MasterProtos.RunCleanerChoreRequest request)
1524          throws ServiceException {
1525        return stub.runCleanerChore(controller, request);
1526      }
1527
1528      @Override
1529      public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning(
1530          RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request)
1531          throws ServiceException {
1532        return stub.setCleanerChoreRunning(controller, request);
1533      }
1534
1535      @Override
1536      public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled(
1537          RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request)
1538          throws ServiceException {
1539        return stub.isCleanerChoreEnabled(controller, request);
1540      }
1541
1542      @Override
1543      public ClientProtos.CoprocessorServiceResponse execMasterService(
1544          RpcController controller, ClientProtos.CoprocessorServiceRequest request)
1545          throws ServiceException {
1546        return stub.execMasterService(controller, request);
1547      }
1548
1549      @Override
1550      public MasterProtos.SnapshotResponse snapshot(RpcController controller,
1551          MasterProtos.SnapshotRequest request) throws ServiceException {
1552        return stub.snapshot(controller, request);
1553      }
1554
1555      @Override
1556      public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots(
1557          RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request)
1558          throws ServiceException {
1559        return stub.getCompletedSnapshots(controller, request);
1560      }
1561
1562      @Override
1563      public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1564          MasterProtos.DeleteSnapshotRequest request) throws ServiceException {
1565        return stub.deleteSnapshot(controller, request);
1566      }
1567
1568      @Override
1569      public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1570          MasterProtos.IsSnapshotDoneRequest request) throws ServiceException {
1571        return stub.isSnapshotDone(controller, request);
1572      }
1573
1574      @Override
1575      public MasterProtos.RestoreSnapshotResponse restoreSnapshot(
1576          RpcController controller, MasterProtos.RestoreSnapshotRequest request)
1577          throws ServiceException {
1578        return stub.restoreSnapshot(controller, request);
1579      }
1580
1581      @Override
1582      public MasterProtos.SetSnapshotCleanupResponse switchSnapshotCleanup(
1583          RpcController controller, MasterProtos.SetSnapshotCleanupRequest request)
1584          throws ServiceException {
1585        return stub.switchSnapshotCleanup(controller, request);
1586      }
1587
1588      @Override
1589      public MasterProtos.IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled(
1590          RpcController controller, MasterProtos.IsSnapshotCleanupEnabledRequest request)
1591          throws ServiceException {
1592        return stub.isSnapshotCleanupEnabled(controller, request);
1593      }
1594
1595      @Override
1596      public MasterProtos.ExecProcedureResponse execProcedure(
1597          RpcController controller, MasterProtos.ExecProcedureRequest request)
1598          throws ServiceException {
1599        return stub.execProcedure(controller, request);
1600      }
1601
1602      @Override
1603      public MasterProtos.ExecProcedureResponse execProcedureWithRet(
1604          RpcController controller, MasterProtos.ExecProcedureRequest request)
1605          throws ServiceException {
1606        return stub.execProcedureWithRet(controller, request);
1607      }
1608
1609      @Override
1610      public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller,
1611          MasterProtos.IsProcedureDoneRequest request) throws ServiceException {
1612        return stub.isProcedureDone(controller, request);
1613      }
1614
1615      @Override
1616      public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
1617          MasterProtos.GetProcedureResultRequest request) throws ServiceException {
1618        return stub.getProcedureResult(controller, request);
1619      }
1620
1621      @Override
1622      public MasterProtos.IsMasterRunningResponse isMasterRunning(
1623          RpcController controller, MasterProtos.IsMasterRunningRequest request)
1624          throws ServiceException {
1625        return stub.isMasterRunning(controller, request);
1626      }
1627
1628      @Override
1629      public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller,
1630          MasterProtos.ModifyNamespaceRequest request)
1631      throws ServiceException {
1632        return stub.modifyNamespace(controller, request);
1633      }
1634
1635      @Override
1636      public MasterProtos.CreateNamespaceResponse createNamespace(
1637          RpcController controller,
1638          MasterProtos.CreateNamespaceRequest request) throws ServiceException {
1639        return stub.createNamespace(controller, request);
1640      }
1641
1642      @Override
1643      public MasterProtos.DeleteNamespaceResponse deleteNamespace(
1644          RpcController controller,
1645          MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
1646        return stub.deleteNamespace(controller, request);
1647      }
1648
1649      @Override
1650      public MasterProtos.ListNamespacesResponse listNamespaces(
1651          RpcController controller,
1652          MasterProtos.ListNamespacesRequest request) throws ServiceException {
1653        return stub.listNamespaces(controller, request);
1654      }
1655
1656      @Override
1657      public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(
1658          RpcController controller,
1659          MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException {
1660        return stub.getNamespaceDescriptor(controller, request);
1661      }
1662
1663      @Override
1664      public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(
1665          RpcController controller,
1666          MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException {
1667        return stub.listNamespaceDescriptors(controller, request);
1668      }
1669
1670      @Override
1671      public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1672          RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request)
1673              throws ServiceException {
1674        return stub.listTableDescriptorsByNamespace(controller, request);
1675      }
1676
1677      @Override
1678      public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1679          RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request)
1680              throws ServiceException {
1681        return stub.listTableNamesByNamespace(controller, request);
1682      }
1683
1684      @Override
1685      public MasterProtos.GetTableStateResponse getTableState(
1686              RpcController controller, MasterProtos.GetTableStateRequest request)
1687              throws ServiceException {
1688        return stub.getTableState(controller, request);
1689      }
1690
1691      @Override
1692      public void close() {
1693        release(this.mss);
1694      }
1695
1696      @Override
1697      public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus(
1698          RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request)
1699          throws ServiceException {
1700        return stub.getSchemaAlterStatus(controller, request);
1701      }
1702
1703      @Override
1704      public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(
1705          RpcController controller, MasterProtos.GetTableDescriptorsRequest request)
1706          throws ServiceException {
1707        return stub.getTableDescriptors(controller, request);
1708      }
1709
1710      @Override
1711      public MasterProtos.GetTableNamesResponse getTableNames(
1712          RpcController controller, MasterProtos.GetTableNamesRequest request)
1713          throws ServiceException {
1714        return stub.getTableNames(controller, request);
1715      }
1716
1717      @Override
1718      public MasterProtos.GetClusterStatusResponse getClusterStatus(
1719          RpcController controller, MasterProtos.GetClusterStatusRequest request)
1720          throws ServiceException {
1721        return stub.getClusterStatus(controller, request);
1722      }
1723
1724      @Override
1725      public MasterProtos.SetQuotaResponse setQuota(
1726          RpcController controller, MasterProtos.SetQuotaRequest request)
1727          throws ServiceException {
1728        return stub.setQuota(controller, request);
1729      }
1730
1731      @Override
1732      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
1733          RpcController controller, MasterProtos.MajorCompactionTimestampRequest request)
1734          throws ServiceException {
1735        return stub.getLastMajorCompactionTimestamp(controller, request);
1736      }
1737
1738      @Override
1739      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
1740          RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request)
1741          throws ServiceException {
1742        return stub.getLastMajorCompactionTimestampForRegion(controller, request);
1743      }
1744
1745      @Override
1746      public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
1747          IsBalancerEnabledRequest request) throws ServiceException {
1748        return stub.isBalancerEnabled(controller, request);
1749      }
1750
1751      @Override
1752      public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
1753        RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request)
1754        throws ServiceException {
1755        return stub.setSplitOrMergeEnabled(controller, request);
1756      }
1757
1758      @Override
1759      public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
1760        RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request)
1761              throws ServiceException {
1762        return stub.isSplitOrMergeEnabled(controller, request);
1763      }
1764
1765      @Override
1766      public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
1767          IsNormalizerEnabledRequest request) throws ServiceException {
1768        return stub.isNormalizerEnabled(controller, request);
1769      }
1770
1771      @Override
1772      public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
1773          SecurityCapabilitiesRequest request) throws ServiceException {
1774        return stub.getSecurityCapabilities(controller, request);
1775      }
1776
1777      @Override
1778      public AddReplicationPeerResponse addReplicationPeer(RpcController controller,
1779          AddReplicationPeerRequest request) throws ServiceException {
1780        return stub.addReplicationPeer(controller, request);
1781      }
1782
1783      @Override
1784      public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
1785          RemoveReplicationPeerRequest request) throws ServiceException {
1786        return stub.removeReplicationPeer(controller, request);
1787      }
1788
1789      @Override
1790      public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
1791          EnableReplicationPeerRequest request) throws ServiceException {
1792        return stub.enableReplicationPeer(controller, request);
1793      }
1794
1795      @Override
1796      public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
1797          DisableReplicationPeerRequest request) throws ServiceException {
1798        return stub.disableReplicationPeer(controller, request);
1799      }
1800
1801      @Override
1802      public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
1803          ListDecommissionedRegionServersRequest request) throws ServiceException {
1804        return stub.listDecommissionedRegionServers(controller, request);
1805      }
1806
1807      @Override
1808      public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
1809          DecommissionRegionServersRequest request) throws ServiceException {
1810        return stub.decommissionRegionServers(controller, request);
1811      }
1812
1813      @Override
1814      public RecommissionRegionServerResponse recommissionRegionServer(
1815          RpcController controller, RecommissionRegionServerRequest request)
1816          throws ServiceException {
1817        return stub.recommissionRegionServer(controller, request);
1818      }
1819
1820      @Override
1821      public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
1822          GetReplicationPeerConfigRequest request) throws ServiceException {
1823        return stub.getReplicationPeerConfig(controller, request);
1824      }
1825
1826      @Override
1827      public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig(
1828          RpcController controller, UpdateReplicationPeerConfigRequest request)
1829          throws ServiceException {
1830        return stub.updateReplicationPeerConfig(controller, request);
1831      }
1832
1833      @Override
1834      public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
1835          ListReplicationPeersRequest request) throws ServiceException {
1836        return stub.listReplicationPeers(controller, request);
1837      }
1838
1839      @Override
1840      public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(
1841          RpcController controller, GetSpaceQuotaRegionSizesRequest request)
1842          throws ServiceException {
1843        return stub.getSpaceQuotaRegionSizes(controller, request);
1844      }
1845
1846      @Override
1847      public GetQuotaStatesResponse getQuotaStates(
1848          RpcController controller, GetQuotaStatesRequest request) throws ServiceException {
1849        return stub.getQuotaStates(controller, request);
1850      }
1851
1852      @Override
1853      public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller,
1854          MasterProtos.ClearDeadServersRequest request) throws ServiceException {
1855        return stub.clearDeadServers(controller, request);
1856      }
1857
1858      @Override
1859      public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
1860          SwitchRpcThrottleRequest request) throws ServiceException {
1861        return stub.switchRpcThrottle(controller, request);
1862      }
1863
1864      @Override
1865      public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller,
1866          IsRpcThrottleEnabledRequest request) throws ServiceException {
1867        return stub.isRpcThrottleEnabled(controller, request);
1868      }
1869
1870      @Override
1871      public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller,
1872          SwitchExceedThrottleQuotaRequest request) throws ServiceException {
1873        return stub.switchExceedThrottleQuota(controller, request);
1874      }
1875
1876      @Override
1877      public AccessControlProtos.GrantResponse grant(RpcController controller,
1878          AccessControlProtos.GrantRequest request) throws ServiceException {
1879        return stub.grant(controller, request);
1880      }
1881
1882      @Override
1883      public AccessControlProtos.RevokeResponse revoke(RpcController controller,
1884          AccessControlProtos.RevokeRequest request) throws ServiceException {
1885        return stub.revoke(controller, request);
1886      }
1887
1888      @Override
1889      public GetUserPermissionsResponse getUserPermissions(RpcController controller,
1890          GetUserPermissionsRequest request) throws ServiceException {
1891        return stub.getUserPermissions(controller, request);
1892      }
1893
1894      @Override
1895      public HasUserPermissionsResponse hasUserPermissions(RpcController controller,
1896          HasUserPermissionsRequest request) throws ServiceException {
1897        return stub.hasUserPermissions(controller, request);
1898      }
1899
1900      @Override
1901      public HBaseProtos.LogEntry getLogEntries(RpcController controller,
1902          HBaseProtos.LogRequest request) throws ServiceException {
1903        return stub.getLogEntries(controller, request);
1904      }
1905    };
1906  }
1907
1908  private static void release(MasterServiceState mss) {
1909    if (mss != null && mss.connection != null) {
1910      ((ConnectionImplementation)mss.connection).releaseMaster(mss);
1911    }
1912  }
1913
1914  private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
1915    if (mss.getStub() == null){
1916      return false;
1917    }
1918    try {
1919      return mss.isMasterRunning();
1920    } catch (UndeclaredThrowableException e) {
1921      // It's somehow messy, but we can receive exceptions such as
1922      //  java.net.ConnectException but they're not declared. So we catch it...
1923      LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
1924      return false;
1925    } catch (IOException se) {
1926      LOG.warn("Checking master connection", se);
1927      return false;
1928    }
1929  }
1930
1931  void releaseMaster(MasterServiceState mss) {
1932    if (mss.getStub() == null) {
1933      return;
1934    }
1935    synchronized (masterLock) {
1936      --mss.userCount;
1937    }
1938  }
1939
1940  private void closeMasterService(MasterServiceState mss) {
1941    if (mss.getStub() != null) {
1942      LOG.info("Closing master protocol: " + mss);
1943      mss.clearStub();
1944    }
1945    mss.userCount = 0;
1946  }
1947
1948  /**
1949   * Immediate close of the shared master. Can be by the delayed close or when closing the
1950   * connection itself.
1951   */
1952  private void closeMaster() {
1953    synchronized (masterLock) {
1954      closeMasterService(masterServiceState);
1955    }
1956  }
1957
1958  void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) {
1959    HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
1960    cacheLocation(hri.getTable(), source, newHrl);
1961  }
1962
1963  @Override
1964  public void deleteCachedRegionLocation(final HRegionLocation location) {
1965    metaCache.clearCache(location);
1966  }
1967
1968  /**
1969   * Update the location with the new value (if the exception is a RegionMovedException)
1970   * or delete it from the cache. Does nothing if we can be sure from the exception that
1971   * the location is still accurate, or if the cache has already been updated.
1972   * @param exception an object (to simplify user code) on which we will try to find a nested
1973   *   or wrapped or both RegionMovedException
1974   * @param source server that is the source of the location update.
1975   */
1976  @Override
1977  public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
1978    final Object exception, final ServerName source) {
1979    if (rowkey == null || tableName == null) {
1980      LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
1981          ", tableName=" + (tableName == null ? "null" : tableName));
1982      return;
1983    }
1984
1985    if (source == null) {
1986      // This should not happen, but let's secure ourselves.
1987      return;
1988    }
1989
1990    if (regionName == null) {
1991      // we do not know which region, so just remove the cache entry for the row and server
1992      if (metrics != null) {
1993        metrics.incrCacheDroppingExceptions(exception);
1994      }
1995      metaCache.clearCache(tableName, rowkey, source);
1996      return;
1997    }
1998
1999    // Is it something we have already updated?
2000    final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
2001    HRegionLocation oldLocation = null;
2002    if (oldLocations != null) {
2003      oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
2004    }
2005    if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
2006      // There is no such location in the cache (it's been removed already) or
2007      // the cache has already been refreshed with a different location.  => nothing to do
2008      return;
2009    }
2010
2011    RegionInfo regionInfo = oldLocation.getRegion();
2012    Throwable cause = ClientExceptionsUtil.findException(exception);
2013    if (cause != null) {
2014      if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
2015        // We know that the region is still on this region server
2016        return;
2017      }
2018
2019      if (cause instanceof RegionMovedException) {
2020        RegionMovedException rme = (RegionMovedException) cause;
2021        if (LOG.isTraceEnabled()) {
2022          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
2023              rme.getHostname() + ":" + rme.getPort() +
2024              " according to " + source.getAddress());
2025        }
2026        // We know that the region is not anymore on this region server, but we know
2027        //  the new location.
2028        updateCachedLocation(
2029            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
2030        return;
2031      }
2032    }
2033
2034    if (metrics != null) {
2035      metrics.incrCacheDroppingExceptions(exception);
2036    }
2037
2038    // Tell metaReplicaSelector that the location is stale. It will create a stale entry
2039    // with timestamp internally. Next time the client looks up the same location,
2040    // it will pick a different meta replica region.
2041    if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
2042      metaReplicaSelector.onError(oldLocation);
2043    }
2044
2045    // If we're here, it means that can cannot be sure about the location, so we remove it from
2046    // the cache. Do not send the source because source can be a new server in the same host:port
2047    metaCache.clearCache(regionInfo);
2048  }
2049
2050  @Override
2051  public AsyncProcess getAsyncProcess() {
2052    return asyncProcess;
2053  }
2054
2055  @Override
2056  public ServerStatisticTracker getStatisticsTracker() {
2057    return this.stats;
2058  }
2059
2060  @Override
2061  public ClientBackoffPolicy getBackoffPolicy() {
2062    return this.backoffPolicy;
2063  }
2064
2065  /*
2066   * Return the number of cached region for a table. It will only be called
2067   * from a unit test.
2068   */
2069  int getNumberOfCachedRegionLocations(final TableName tableName) {
2070    return metaCache.getNumberOfCachedRegionLocations(tableName);
2071  }
2072
2073  @Override
2074  public void abort(final String msg, Throwable t) {
2075    if (t != null) {
2076      LOG.error(HBaseMarkers.FATAL, msg, t);
2077    } else {
2078      LOG.error(HBaseMarkers.FATAL, msg);
2079    }
2080    this.aborted = true;
2081    close();
2082    this.closed = true;
2083  }
2084
2085  @Override
2086  public boolean isClosed() {
2087    return this.closed;
2088  }
2089
2090  @Override
2091  public boolean isAborted(){
2092    return this.aborted;
2093  }
2094
2095  @Override
2096  public void close() {
2097    if (this.closed) {
2098      return;
2099    }
2100    closeMaster();
2101    shutdownPools();
2102    if (this.metrics != null) {
2103      this.metrics.shutdown();
2104    }
2105    this.closed = true;
2106    registry.close();
2107    this.stubs.clear();
2108    if (clusterStatusListener != null) {
2109      clusterStatusListener.close();
2110    }
2111    if (rpcClient != null) {
2112      rpcClient.close();
2113    }
2114    if (choreService != null) {
2115      choreService.shutdown();
2116    }
2117  }
2118
2119  /**
2120   * Close the connection for good. On the off chance that someone is unable to close
2121   * the connection, perhaps because it bailed out prematurely, the method
2122   * below will ensure that this instance is cleaned up.
2123   * Caveat: The JVM may take an unknown amount of time to call finalize on an
2124   * unreachable object, so our hope is that every consumer cleans up after
2125   * itself, like any good citizen.
2126   */
2127  @Override
2128  protected void finalize() throws Throwable {
2129    super.finalize();
2130    close();
2131  }
2132
2133  @Override
2134  public NonceGenerator getNonceGenerator() {
2135    return nonceGenerator;
2136  }
2137
2138  @Override
2139  public TableState getTableState(TableName tableName) throws IOException {
2140    checkClosed();
2141    TableState tableState = MetaTableAccessor.getTableState(this, tableName);
2142    if (tableState == null) {
2143      throw new TableNotFoundException(tableName);
2144    }
2145    return tableState;
2146  }
2147
2148  @Override
2149  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
2150    return RpcRetryingCallerFactory
2151        .instantiate(conf, this.interceptor, this.getStatisticsTracker());
2152  }
2153
2154  @Override
2155  public boolean hasCellBlockSupport() {
2156    return this.rpcClient.hasCellBlockSupport();
2157  }
2158
2159  @Override
2160  public ConnectionConfiguration getConnectionConfiguration() {
2161    return this.connectionConfig;
2162  }
2163
2164  @Override
2165  public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
2166    return this.rpcCallerFactory;
2167  }
2168
2169  @Override
2170  public RpcControllerFactory getRpcControllerFactory() {
2171    return this.rpcControllerFactory;
2172  }
2173
2174  private static <T> T get(CompletableFuture<T> future) throws IOException {
2175    try {
2176      return future.get();
2177    } catch (InterruptedException e) {
2178      Thread.currentThread().interrupt();
2179      throw (IOException) new InterruptedIOException().initCause(e);
2180    } catch (ExecutionException e) {
2181      Throwable cause = e.getCause();
2182      Throwables.propagateIfPossible(cause, IOException.class);
2183      throw new IOException(cause);
2184    }
2185  }
2186}