001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
022import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
023import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
024import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
026import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsentEx;
027
028import edu.umd.cs.findbugs.annotations.Nullable;
029import java.io.Closeable;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import java.lang.reflect.UndeclaredThrowableException;
033import java.util.ArrayList;
034import java.util.Collections;
035import java.util.Date;
036import java.util.List;
037import java.util.concurrent.BlockingQueue;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.ConcurrentMap;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.ExecutorService;
043import java.util.concurrent.LinkedBlockingQueue;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.atomic.AtomicInteger;
047import java.util.concurrent.locks.ReentrantLock;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.hbase.CallQueueTooBigException;
050import org.apache.hadoop.hbase.DoNotRetryIOException;
051import org.apache.hadoop.hbase.HConstants;
052import org.apache.hadoop.hbase.HRegionLocation;
053import org.apache.hadoop.hbase.MasterNotRunningException;
054import org.apache.hadoop.hbase.MetaTableAccessor;
055import org.apache.hadoop.hbase.RegionLocations;
056import org.apache.hadoop.hbase.ServerName;
057import org.apache.hadoop.hbase.TableName;
058import org.apache.hadoop.hbase.TableNotEnabledException;
059import org.apache.hadoop.hbase.TableNotFoundException;
060import org.apache.hadoop.hbase.ZooKeeperConnectionException;
061import org.apache.hadoop.hbase.client.Scan.ReadType;
062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
063import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
064import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
065import org.apache.hadoop.hbase.exceptions.RegionMovedException;
066import org.apache.hadoop.hbase.ipc.RpcClient;
067import org.apache.hadoop.hbase.ipc.RpcClientFactory;
068import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
069import org.apache.hadoop.hbase.log.HBaseMarkers;
070import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
071import org.apache.hadoop.hbase.security.User;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
074import org.apache.hadoop.hbase.util.ExceptionUtil;
075import org.apache.hadoop.hbase.util.Pair;
076import org.apache.hadoop.hbase.util.ReflectionUtils;
077import org.apache.hadoop.hbase.util.Threads;
078import org.apache.hadoop.ipc.RemoteException;
079import org.apache.yetus.audience.InterfaceAudience;
080import org.apache.zookeeper.KeeperException;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
085import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
086import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
087import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
088import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
089import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
090import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse;
103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse;
105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest;
106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse;
107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest;
108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest;
116import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse;
117import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest;
118import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
119import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest;
120import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
121import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
122import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
123import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
124import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
125import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
126import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
127import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
128import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
129
130/**
131 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces.
132 * Encapsulates connection to zookeeper and regionservers.
133 */
134@edu.umd.cs.findbugs.annotations.SuppressWarnings(
135    value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
136    justification="Access to the conncurrent hash map is under a lock so should be fine.")
137@InterfaceAudience.Private
138class ConnectionImplementation implements ClusterConnection, Closeable {
139  public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
140  private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
141
142  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
143
144  private final boolean hostnamesCanChange;
145  private final long pause;
146  private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
147  private boolean useMetaReplicas;
148  private final int metaReplicaCallTimeoutScanInMicroSecond;
149  private final int numTries;
150  final int rpcTimeout;
151
152  /**
153   * Global nonceGenerator shared per client.Currently there's no reason to limit its scope.
154   * Once it's set under nonceGeneratorCreateLock, it is never unset or changed.
155   */
156  private static volatile NonceGenerator nonceGenerator = null;
157  /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */
158  private static final Object nonceGeneratorCreateLock = new Object();
159
160  private final AsyncProcess asyncProcess;
161  // single tracker per connection
162  private final ServerStatisticTracker stats;
163
164  private volatile boolean closed;
165  private volatile boolean aborted;
166
167  // package protected for the tests
168  ClusterStatusListener clusterStatusListener;
169
170  private final Object metaRegionLock = new Object();
171
172  private final Object masterLock = new Object();
173
174  // thread executor shared by all Table instances created
175  // by this connection
176  private volatile ExecutorService batchPool = null;
177  // meta thread executor shared by all Table instances created
178  // by this connection
179  private volatile ExecutorService metaLookupPool = null;
180  private volatile boolean cleanupPool = false;
181
182  private final Configuration conf;
183
184  // cache the configuration value for tables so that we can avoid calling
185  // the expensive Configuration to fetch the value multiple times.
186  private final ConnectionConfiguration connectionConfig;
187
188  // Client rpc instance.
189  private final RpcClient rpcClient;
190
191  private final MetaCache metaCache;
192  private final MetricsConnection metrics;
193
194  protected User user;
195
196  private final RpcRetryingCallerFactory rpcCallerFactory;
197
198  private final RpcControllerFactory rpcControllerFactory;
199
200  private final RetryingCallerInterceptor interceptor;
201
202  /**
203   * Cluster registry of basic info such as clusterid and meta region location.
204   */
205  private final AsyncRegistry registry;
206
207  private final ClientBackoffPolicy backoffPolicy;
208
209  /**
210   * Allow setting an alternate BufferedMutator implementation via
211   * config. If null, use default.
212   */
213  private final String alternateBufferedMutatorClassName;
214
215  /** lock guards against multiple threads trying to query the meta region at the same time */
216  private final ReentrantLock userRegionLock = new ReentrantLock();
217
218  /**
219   * constructor
220   * @param conf Configuration object
221   */
222  ConnectionImplementation(Configuration conf,
223                           ExecutorService pool, User user) throws IOException {
224    this.conf = conf;
225    this.user = user;
226    this.batchPool = pool;
227    this.connectionConfig = new ConnectionConfiguration(conf);
228    this.closed = false;
229    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
230        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
231    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause);
232    if (configuredPauseForCQTBE < pause) {
233      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
234          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
235          + ", will use " + pause + " instead.");
236      this.pauseForCQTBE = pause;
237    } else {
238      this.pauseForCQTBE = configuredPauseForCQTBE;
239    }
240    this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
241      HConstants.DEFAULT_USE_META_REPLICAS);
242    this.metaReplicaCallTimeoutScanInMicroSecond =
243        connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
244
245    // how many times to try, one more than max *retry* time
246    this.numTries = retries2Attempts(connectionConfig.getRetriesNumber());
247    this.rpcTimeout = conf.getInt(
248        HConstants.HBASE_RPC_TIMEOUT_KEY,
249        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
250    if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) {
251      synchronized (nonceGeneratorCreateLock) {
252        if (nonceGenerator == null) {
253          nonceGenerator = PerClientRandomNonceGenerator.get();
254        }
255      }
256    } else {
257      nonceGenerator = NO_NONCE_GENERATOR;
258    }
259
260    this.stats = ServerStatisticTracker.create(conf);
261    this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
262    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
263    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
264    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
265    this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory);
266    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
267      this.metrics = new MetricsConnection(this);
268    } else {
269      this.metrics = null;
270    }
271    this.metaCache = new MetaCache(this.metrics);
272
273    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
274        HConstants.STATUS_PUBLISHED_DEFAULT);
275    this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
276    Class<? extends ClusterStatusListener.Listener> listenerClass =
277        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
278            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
279            ClusterStatusListener.Listener.class);
280
281    // Is there an alternate BufferedMutator to use?
282    this.alternateBufferedMutatorClassName =
283        this.conf.get(BufferedMutator.CLASSNAME_KEY);
284
285    try {
286      this.registry = AsyncRegistryFactory.getRegistry(conf);
287      retrieveClusterId();
288
289      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);
290
291      // Do we publish the status?
292      if (shouldListen) {
293        if (listenerClass == null) {
294          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
295              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
296        } else {
297          clusterStatusListener = new ClusterStatusListener(
298              new ClusterStatusListener.DeadServerHandler() {
299                @Override
300                public void newDead(ServerName sn) {
301                  clearCaches(sn);
302                  rpcClient.cancelConnections(sn);
303                }
304              }, conf, listenerClass);
305        }
306      }
307    } catch (Throwable e) {
308      // avoid leaks: registry, rpcClient, ...
309      LOG.debug("connection construction failed", e);
310      close();
311      throw e;
312    }
313  }
314
315  /**
316   * @param useMetaReplicas
317   */
318  @VisibleForTesting
319  void setUseMetaReplicas(final boolean useMetaReplicas) {
320    this.useMetaReplicas = useMetaReplicas;
321  }
322
323  /**
324   * @param conn The connection for which to replace the generator.
325   * @param cnm Replaces the nonce generator used, for testing.
326   * @return old nonce generator.
327   */
328  @VisibleForTesting
329  static NonceGenerator injectNonceGeneratorForTesting(
330      ClusterConnection conn, NonceGenerator cnm) {
331    ConnectionImplementation connImpl = (ConnectionImplementation)conn;
332    NonceGenerator ng = connImpl.getNonceGenerator();
333    LOG.warn("Nonce generator is being replaced by test code for "
334      + cnm.getClass().getName());
335    nonceGenerator = cnm;
336    return ng;
337  }
338
339  @Override
340  public Table getTable(TableName tableName) throws IOException {
341    return getTable(tableName, getBatchPool());
342  }
343
344  @Override
345  public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
346    return new TableBuilderBase(tableName, connectionConfig) {
347
348      @Override
349      public Table build() {
350        return new HTable(ConnectionImplementation.this, this, rpcCallerFactory,
351            rpcControllerFactory, pool);
352      }
353    };
354  }
355
356  @Override
357  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
358    if (params.getTableName() == null) {
359      throw new IllegalArgumentException("TableName cannot be null.");
360    }
361    if (params.getPool() == null) {
362      params.pool(HTable.getDefaultExecutor(getConfiguration()));
363    }
364    if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
365      params.writeBufferSize(connectionConfig.getWriteBufferSize());
366    }
367    if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) {
368      params.setWriteBufferPeriodicFlushTimeoutMs(
369              connectionConfig.getWriteBufferPeriodicFlushTimeoutMs());
370    }
371    if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) {
372      params.setWriteBufferPeriodicFlushTimerTickMs(
373              connectionConfig.getWriteBufferPeriodicFlushTimerTickMs());
374    }
375    if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
376      params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
377    }
378    // Look to see if an alternate BufferedMutation implementation is wanted.
379    // Look in params and in config. If null, use default.
380    String implementationClassName = params.getImplementationClassName();
381    if (implementationClassName == null) {
382      implementationClassName = this.alternateBufferedMutatorClassName;
383    }
384    if (implementationClassName == null) {
385      return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
386    }
387    try {
388      return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
389          this, rpcCallerFactory, rpcControllerFactory, params);
390    } catch (ClassNotFoundException e) {
391      throw new RuntimeException(e);
392    }
393  }
394
395  @Override
396  public BufferedMutator getBufferedMutator(TableName tableName) {
397    return getBufferedMutator(new BufferedMutatorParams(tableName));
398  }
399
400  @Override
401  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
402    return new HRegionLocator(tableName, this);
403  }
404
405  @Override
406  public Admin getAdmin() throws IOException {
407    return new HBaseAdmin(this);
408  }
409
410  @Override
411  public Hbck getHbck() throws IOException {
412    return getHbck(get(registry.getMasterAddress()));
413  }
414
415  @Override
416  public Hbck getHbck(ServerName masterServer) throws IOException {
417    checkClosed();
418    if (isDeadServer(masterServer)) {
419      throw new RegionServerStoppedException(masterServer + " is dead.");
420    }
421    String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(),
422      masterServer, this.hostnamesCanChange);
423
424    return new HBaseHbck(
425      (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
426        BlockingRpcChannel channel =
427          this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout);
428        return MasterProtos.HbckService.newBlockingStub(channel);
429      }), rpcControllerFactory);
430  }
431
432  @Override
433  public MetricsConnection getConnectionMetrics() {
434    return this.metrics;
435  }
436
437  private ExecutorService getBatchPool() {
438    if (batchPool == null) {
439      synchronized (this) {
440        if (batchPool == null) {
441          int threads = conf.getInt("hbase.hconnection.threads.max", 256);
442          this.batchPool = getThreadPool(threads, threads, "-shared", null);
443          this.cleanupPool = true;
444        }
445      }
446    }
447    return this.batchPool;
448  }
449
450  private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
451      BlockingQueue<Runnable> passedWorkQueue) {
452    // shared HTable thread executor not yet initialized
453    if (maxThreads == 0) {
454      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
455    }
456    if (coreThreads == 0) {
457      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
458    }
459    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
460    BlockingQueue<Runnable> workQueue = passedWorkQueue;
461    if (workQueue == null) {
462      workQueue =
463        new LinkedBlockingQueue<>(maxThreads *
464            conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
465                HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
466      coreThreads = maxThreads;
467    }
468    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
469        coreThreads,
470        maxThreads,
471        keepAliveTime,
472        TimeUnit.SECONDS,
473        workQueue,
474        Threads.newDaemonThreadFactory(toString() + nameHint));
475    tpe.allowCoreThreadTimeOut(true);
476    return tpe;
477  }
478
479  private ExecutorService getMetaLookupPool() {
480    if (this.metaLookupPool == null) {
481      synchronized (this) {
482        if (this.metaLookupPool == null) {
483          //Some of the threads would be used for meta replicas
484          //To start with, threads.max.core threads can hit the meta (including replicas).
485          //After that, requests will get queued up in the passed queue, and only after
486          //the queue is full, a new thread will be started
487          int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
488          this.metaLookupPool = getThreadPool(
489             threads,
490             threads,
491             "-metaLookup-shared-", new LinkedBlockingQueue<>());
492        }
493      }
494    }
495    return this.metaLookupPool;
496  }
497
498  protected ExecutorService getCurrentMetaLookupPool() {
499    return metaLookupPool;
500  }
501
502  protected ExecutorService getCurrentBatchPool() {
503    return batchPool;
504  }
505
506  private void shutdownPools() {
507    if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
508      shutdownBatchPool(this.batchPool);
509    }
510    if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
511      shutdownBatchPool(this.metaLookupPool);
512    }
513  }
514
515  private void shutdownBatchPool(ExecutorService pool) {
516    pool.shutdown();
517    try {
518      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
519        pool.shutdownNow();
520      }
521    } catch (InterruptedException e) {
522      pool.shutdownNow();
523    }
524  }
525
526  /**
527   * For tests only.
528   */
529  @VisibleForTesting
530  RpcClient getRpcClient() {
531    return rpcClient;
532  }
533
534  /**
535   * An identifier that will remain the same for a given connection.
536   */
537  @Override
538  public String toString(){
539    return "hconnection-0x" + Integer.toHexString(hashCode());
540  }
541
542  protected String clusterId = null;
543
544  protected void retrieveClusterId() {
545    if (clusterId != null) {
546      return;
547    }
548    try {
549      this.clusterId = this.registry.getClusterId().get();
550    } catch (InterruptedException | ExecutionException e) {
551      LOG.warn("Retrieve cluster id failed", e);
552    }
553    if (clusterId == null) {
554      clusterId = HConstants.CLUSTER_ID_DEFAULT;
555      LOG.debug("clusterid came back null, using default " + clusterId);
556    }
557  }
558
559  @Override
560  public Configuration getConfiguration() {
561    return this.conf;
562  }
563
564  private void checkClosed() throws DoNotRetryIOException {
565    if (this.closed) {
566      throw new DoNotRetryIOException(toString() + " closed");
567    }
568  }
569
570  /**
571   * @return true if the master is running, throws an exception otherwise
572   * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running
573   * @deprecated this has been deprecated without a replacement
574   */
575  @Deprecated
576  @Override
577  public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
578    // When getting the master connection, we check it's running,
579    // so if there is no exception, it means we've been able to get a
580    // connection on a running master
581    MasterKeepAliveConnection m;
582    try {
583      m = getKeepAliveMasterService();
584    } catch (IOException e) {
585      throw new MasterNotRunningException(e);
586    }
587    m.close();
588    return true;
589  }
590
591  @Override
592  public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row,
593      boolean reload) throws IOException {
594    return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row);
595  }
596
597
598  @Override
599  public boolean isTableEnabled(TableName tableName) throws IOException {
600    return getTableState(tableName).inStates(TableState.State.ENABLED);
601  }
602
603  @Override
604  public boolean isTableDisabled(TableName tableName) throws IOException {
605    return getTableState(tableName).inStates(TableState.State.DISABLED);
606  }
607
608  @Override
609  public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
610      throws IOException {
611    checkClosed();
612    try {
613      if (!isTableEnabled(tableName)) {
614        LOG.debug("Table {} not enabled", tableName);
615        return false;
616      }
617      if (TableName.isMetaTableName(tableName)) {
618        // meta table is always available
619        return true;
620      }
621      List<Pair<RegionInfo, ServerName>> locations =
622        MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true);
623
624      int notDeployed = 0;
625      int regionCount = 0;
626      for (Pair<RegionInfo, ServerName> pair : locations) {
627        RegionInfo info = pair.getFirst();
628        if (pair.getSecond() == null) {
629          LOG.debug("Table {} has not deployed region {}", tableName,
630              pair.getFirst().getEncodedName());
631          notDeployed++;
632        } else if (splitKeys != null
633            && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
634          for (byte[] splitKey : splitKeys) {
635            // Just check if the splitkey is available
636            if (Bytes.equals(info.getStartKey(), splitKey)) {
637              regionCount++;
638              break;
639            }
640          }
641        } else {
642          // Always empty start row should be counted
643          regionCount++;
644        }
645      }
646      if (notDeployed > 0) {
647        if (LOG.isDebugEnabled()) {
648          LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed);
649        }
650        return false;
651      } else if (splitKeys != null && regionCount != splitKeys.length + 1) {
652        if (LOG.isDebugEnabled()) {
653          LOG.debug("Table {} expected to have {} regions, but only {} available", tableName,
654              splitKeys.length + 1, regionCount);
655        }
656        return false;
657      } else {
658        LOG.trace("Table {} should be available", tableName);
659        return true;
660      }
661    } catch (TableNotFoundException tnfe) {
662      LOG.warn("Table {} does not exist", tableName);
663      return false;
664    }
665  }
666
667  @Override
668  public HRegionLocation locateRegion(final byte[] regionName) throws IOException {
669    RegionLocations locations = locateRegion(RegionInfo.getTable(regionName),
670      RegionInfo.getStartKey(regionName), false, true);
671    return locations == null ? null : locations.getRegionLocation();
672  }
673
674  private boolean isDeadServer(ServerName sn) {
675    if (clusterStatusListener == null) {
676      return false;
677    } else {
678      return clusterStatusListener.isDeadServer(sn);
679    }
680  }
681
682  @Override
683  public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
684    return locateRegions(tableName, false, true);
685  }
686
687  @Override
688  public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache,
689      boolean offlined) throws IOException {
690    List<RegionInfo> regions;
691    if (TableName.isMetaTableName(tableName)) {
692      regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO);
693    } else {
694      regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined);
695    }
696    List<HRegionLocation> locations = new ArrayList<>();
697    for (RegionInfo regionInfo : regions) {
698      if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) {
699        continue;
700      }
701      RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true);
702      if (list != null) {
703        for (HRegionLocation loc : list.getRegionLocations()) {
704          if (loc != null) {
705            locations.add(loc);
706          }
707        }
708      }
709    }
710    return locations;
711  }
712
713  @Override
714  public HRegionLocation locateRegion(final TableName tableName, final byte[] row)
715      throws IOException {
716    RegionLocations locations = locateRegion(tableName, row, true, true);
717    return locations == null ? null : locations.getRegionLocation();
718  }
719
720  @Override
721  public HRegionLocation relocateRegion(final TableName tableName, final byte[] row)
722      throws IOException {
723    RegionLocations locations =
724      relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
725    return locations == null ? null
726      : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID);
727  }
728
729  @Override
730  public RegionLocations relocateRegion(final TableName tableName,
731      final byte [] row, int replicaId) throws IOException{
732    // Since this is an explicit request not to use any caching, finding
733    // disabled tables should not be desirable.  This will ensure that an exception is thrown when
734    // the first time a disabled table is interacted with.
735    if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) {
736      throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
737    }
738
739    return locateRegion(tableName, row, false, true, replicaId);
740  }
741
742  @Override
743  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
744      boolean retry) throws IOException {
745    return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
746  }
747
748  @Override
749  public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache,
750      boolean retry, int replicaId) throws IOException {
751    checkClosed();
752    if (tableName == null || tableName.getName().length == 0) {
753      throw new IllegalArgumentException("table name cannot be null or zero length");
754    }
755    if (tableName.equals(TableName.META_TABLE_NAME)) {
756      return locateMeta(tableName, useCache, replicaId);
757    } else {
758      // Region not in the cache - have to go to the meta RS
759      return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
760    }
761  }
762
763  private RegionLocations locateMeta(final TableName tableName,
764      boolean useCache, int replicaId) throws IOException {
765    // HBASE-10785: We cache the location of the META itself, so that we are not overloading
766    // zookeeper with one request for every region lookup. We cache the META with empty row
767    // key in MetaCache.
768    byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
769    RegionLocations locations = null;
770    if (useCache) {
771      locations = getCachedLocation(tableName, metaCacheKey);
772      if (locations != null && locations.getRegionLocation(replicaId) != null) {
773        return locations;
774      }
775    }
776
777    // only one thread should do the lookup.
778    synchronized (metaRegionLock) {
779      // Check the cache again for a hit in case some other thread made the
780      // same query while we were waiting on the lock.
781      if (useCache) {
782        locations = getCachedLocation(tableName, metaCacheKey);
783        if (locations != null && locations.getRegionLocation(replicaId) != null) {
784          return locations;
785        }
786      }
787
788      // Look up from zookeeper
789      locations = get(this.registry.getMetaRegionLocation());
790      if (locations != null) {
791        cacheLocation(tableName, locations);
792      }
793    }
794    return locations;
795  }
796
797  /**
798   * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're
799   * seeking.
800   */
801  private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache,
802      boolean retry, int replicaId) throws IOException {
803    // If we are supposed to be using the cache, look in the cache to see if we already have the
804    // region.
805    if (useCache) {
806      RegionLocations locations = getCachedLocation(tableName, row);
807      if (locations != null && locations.getRegionLocation(replicaId) != null) {
808        return locations;
809      }
810    }
811    // build the key of the meta region we should be looking for.
812    // the extra 9's on the end are necessary to allow "exact" matches
813    // without knowing the precise region names.
814    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
815    byte[] metaStopKey =
816      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
817    Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
818      .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
819      .setReadType(ReadType.PREAD);
820    if (this.useMetaReplicas) {
821      s.setConsistency(Consistency.TIMELINE);
822    }
823    int maxAttempts = (retry ? numTries : 1);
824    boolean relocateMeta = false;
825    for (int tries = 0; ; tries++) {
826      if (tries >= maxAttempts) {
827        throw new NoServerForRegionException("Unable to find region for "
828            + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries.");
829      }
830      if (useCache) {
831        RegionLocations locations = getCachedLocation(tableName, row);
832        if (locations != null && locations.getRegionLocation(replicaId) != null) {
833          return locations;
834        }
835      } else {
836        // If we are not supposed to be using the cache, delete any existing cached location
837        // so it won't interfere.
838        // We are only supposed to clean the cache for the specific replicaId
839        metaCache.clearCache(tableName, row, replicaId);
840      }
841      // Query the meta region
842      long pauseBase = this.pause;
843      userRegionLock.lock();
844      try {
845        if (useCache) {// re-check cache after get lock
846          RegionLocations locations = getCachedLocation(tableName, row);
847          if (locations != null && locations.getRegionLocation(replicaId) != null) {
848            return locations;
849          }
850        }
851        if (relocateMeta) {
852          relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
853            RegionInfo.DEFAULT_REPLICA_ID);
854        }
855        s.resetMvccReadPoint();
856        try (ReversedClientScanner rcs =
857          new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory,
858            rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) {
859          boolean tableNotFound = true;
860          for (;;) {
861            Result regionInfoRow = rcs.next();
862            if (regionInfoRow == null) {
863              if (tableNotFound) {
864                throw new TableNotFoundException(tableName);
865              } else {
866                throw new IOException(
867                  "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
868              }
869            }
870            tableNotFound = false;
871            // convert the row result into the HRegionLocation we need!
872            RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
873            if (locations == null || locations.getRegionLocation(replicaId) == null) {
874              throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
875            }
876            RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion();
877            if (regionInfo == null) {
878              throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME +
879                ", row=" + regionInfoRow);
880            }
881            // See HBASE-20182. It is possible that we locate to a split parent even after the
882            // children are online, so here we need to skip this region and go to the next one.
883            if (regionInfo.isSplitParent()) {
884              continue;
885            }
886            if (regionInfo.isOffline()) {
887              throw new RegionOfflineException("Region offline; disable table call? " +
888                  regionInfo.getRegionNameAsString());
889            }
890            // It is possible that the split children have not been online yet and we have skipped
891            // the parent in the above condition, so we may have already reached a region which does
892            // not contains us.
893            if (!regionInfo.containsRow(row)) {
894              throw new IOException(
895                "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName);
896            }
897            ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
898            if (serverName == null) {
899              throw new NoServerForRegionException("No server address listed in " +
900                TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() +
901                " containing row " + Bytes.toStringBinary(row));
902            }
903            if (isDeadServer(serverName)) {
904              throw new RegionServerStoppedException(
905                "hbase:meta says the region " + regionInfo.getRegionNameAsString() +
906                  " is managed by the server " + serverName + ", but it is dead.");
907            }
908            // Instantiate the location
909            cacheLocation(tableName, locations);
910            return locations;
911          }
912        }
913      } catch (TableNotFoundException e) {
914        // if we got this error, probably means the table just plain doesn't
915        // exist. rethrow the error immediately. this should always be coming
916        // from the HTable constructor.
917        throw e;
918      } catch (IOException e) {
919        ExceptionUtil.rethrowIfInterrupt(e);
920        if (e instanceof RemoteException) {
921          e = ((RemoteException)e).unwrapRemoteException();
922        }
923        if (e instanceof CallQueueTooBigException) {
924          // Give a special check on CallQueueTooBigException, see #HBASE-17114
925          pauseBase = this.pauseForCQTBE;
926        }
927        if (tries < maxAttempts - 1) {
928          LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " +
929            "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e);
930        } else {
931          throw e;
932        }
933        // Only relocate the parent region if necessary
934        relocateMeta =
935          !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException);
936      } finally {
937        userRegionLock.unlock();
938      }
939      try{
940        Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries));
941      } catch (InterruptedException e) {
942        throw new InterruptedIOException("Giving up trying to location region in " +
943          "meta: thread is interrupted.");
944      }
945    }
946  }
947
948  /**
949   * Put a newly discovered HRegionLocation into the cache.
950   * @param tableName The table name.
951   * @param location the new location
952   */
953  @Override
954  public void cacheLocation(final TableName tableName, final RegionLocations location) {
955    metaCache.cacheLocation(tableName, location);
956  }
957
958  /**
959   * Search the cache for a location that fits our table and row key.
960   * Return null if no suitable region is located.
961   * @return Null or region location found in cache.
962   */
963  RegionLocations getCachedLocation(final TableName tableName,
964      final byte [] row) {
965    return metaCache.getCachedLocation(tableName, row);
966  }
967
968  public void clearRegionCache(final TableName tableName, byte[] row) {
969    metaCache.clearCache(tableName, row);
970  }
971
972  /*
973   * Delete all cached entries of a table that maps to a specific location.
974   */
975  @Override
976  public void clearCaches(final ServerName serverName) {
977    metaCache.clearCache(serverName);
978  }
979
980  @Override
981  public void clearRegionCache() {
982    metaCache.clearCache();
983  }
984
985  @Override
986  public void clearRegionCache(final TableName tableName) {
987    metaCache.clearCache(tableName);
988  }
989
990  /**
991   * Put a newly discovered HRegionLocation into the cache.
992   * @param tableName The table name.
993   * @param source the source of the new location, if it's not coming from meta
994   * @param location the new location
995   */
996  private void cacheLocation(final TableName tableName, final ServerName source,
997      final HRegionLocation location) {
998    metaCache.cacheLocation(tableName, source, location);
999  }
1000
1001  // Map keyed by service name + regionserver to service stub implementation
1002  private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>();
1003
1004  /**
1005   * State of the MasterService connection/setup.
1006   */
1007  static class MasterServiceState {
1008    Connection connection;
1009
1010    MasterProtos.MasterService.BlockingInterface stub;
1011    int userCount;
1012
1013    MasterServiceState(final Connection connection) {
1014      super();
1015      this.connection = connection;
1016    }
1017
1018    @Override
1019    public String toString() {
1020      return "MasterService";
1021    }
1022
1023    Object getStub() {
1024      return this.stub;
1025    }
1026
1027    void clearStub() {
1028      this.stub = null;
1029    }
1030
1031    boolean isMasterRunning() throws IOException {
1032      MasterProtos.IsMasterRunningResponse response = null;
1033      try {
1034        response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1035      } catch (Exception e) {
1036        throw ProtobufUtil.handleRemoteException(e);
1037      }
1038      return response != null? response.getIsMasterRunning(): false;
1039    }
1040  }
1041
1042  /**
1043   * The record of errors for servers.
1044   */
1045  static class ServerErrorTracker {
1046    // We need a concurrent map here, as we could have multiple threads updating it in parallel.
1047    private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>();
1048    private final long canRetryUntil;
1049    private final int maxTries;// max number to try
1050    private final long startTrackingTime;
1051
1052    /**
1053     * Constructor
1054     * @param timeout how long to wait before timeout, in unit of millisecond
1055     * @param maxTries how many times to try
1056     */
1057    public ServerErrorTracker(long timeout, int maxTries) {
1058      this.maxTries = maxTries;
1059      this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
1060      this.startTrackingTime = new Date().getTime();
1061    }
1062
1063    /**
1064     * We stop to retry when we have exhausted BOTH the number of tries and the time allocated.
1065     * @param numAttempt how many times we have tried by now
1066     */
1067    boolean canTryMore(int numAttempt) {
1068      // If there is a single try we must not take into account the time.
1069      return numAttempt < maxTries || (maxTries > 1 &&
1070          EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
1071    }
1072
1073    /**
1074     * Calculates the back-off time for a retrying request to a particular server.
1075     *
1076     * @param server    The server in question.
1077     * @param basePause The default hci pause.
1078     * @return The time to wait before sending next request.
1079     */
1080    long calculateBackoffTime(ServerName server, long basePause) {
1081      long result;
1082      ServerErrors errorStats = errorsByServer.get(server);
1083      if (errorStats != null) {
1084        result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1));
1085      } else {
1086        result = 0; // yes, if the server is not in our list we don't wait before retrying.
1087      }
1088      return result;
1089    }
1090
1091    /**
1092     * Reports that there was an error on the server to do whatever bean-counting necessary.
1093     * @param server The server in question.
1094     */
1095    void reportServerError(ServerName server) {
1096      computeIfAbsent(errorsByServer, server, ServerErrors::new).addError();
1097    }
1098
1099    long getStartTrackingTime() {
1100      return startTrackingTime;
1101    }
1102
1103    /**
1104     * The record of errors for a server.
1105     */
1106    private static class ServerErrors {
1107      private final AtomicInteger retries = new AtomicInteger(0);
1108
1109      public int getCount() {
1110        return retries.get();
1111      }
1112
1113      public void addError() {
1114        retries.incrementAndGet();
1115      }
1116    }
1117  }
1118
1119  /**
1120   * Class to make a MasterServiceStubMaker stub.
1121   */
1122  private final class MasterServiceStubMaker {
1123
1124    private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
1125        throws IOException {
1126      try {
1127        stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest());
1128      } catch (ServiceException e) {
1129        throw ProtobufUtil.handleRemoteException(e);
1130      }
1131    }
1132
1133    /**
1134     * Create a stub. Try once only. It is not typed because there is no common type to protobuf
1135     * services nor their interfaces. Let the caller do appropriate casting.
1136     * @return A stub for master services.
1137     */
1138    private MasterProtos.MasterService.BlockingInterface makeStubNoRetries()
1139        throws IOException, KeeperException {
1140      ServerName sn = get(registry.getMasterAddress());
1141      if (sn == null) {
1142        String msg = "ZooKeeper available but no active master location found";
1143        LOG.info(msg);
1144        throw new MasterNotRunningException(msg);
1145      }
1146      if (isDeadServer(sn)) {
1147        throw new MasterNotRunningException(sn + " is dead.");
1148      }
1149      // Use the security info interface name as our stub key
1150      String key =
1151          getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange);
1152      MasterProtos.MasterService.BlockingInterface stub =
1153          (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1154            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout);
1155            return MasterProtos.MasterService.newBlockingStub(channel);
1156          });
1157      isMasterRunning(stub);
1158      return stub;
1159    }
1160
1161    /**
1162     * Create a stub against the master. Retry if necessary.
1163     * @return A stub to do <code>intf</code> against the master
1164     * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running
1165     */
1166    MasterProtos.MasterService.BlockingInterface makeStub() throws IOException {
1167      // The lock must be at the beginning to prevent multiple master creations
1168      // (and leaks) in a multithread context
1169      synchronized (masterLock) {
1170        Exception exceptionCaught = null;
1171        if (!closed) {
1172          try {
1173            return makeStubNoRetries();
1174          } catch (IOException e) {
1175            exceptionCaught = e;
1176          } catch (KeeperException e) {
1177            exceptionCaught = e;
1178          }
1179          throw new MasterNotRunningException(exceptionCaught);
1180        } else {
1181          throw new DoNotRetryIOException("Connection was closed while trying to get master");
1182        }
1183      }
1184    }
1185  }
1186
1187  @Override
1188  public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException {
1189    return getAdmin(get(registry.getMasterAddress()));
1190  }
1191
1192  @Override
1193  public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName)
1194      throws IOException {
1195    checkClosed();
1196    if (isDeadServer(serverName)) {
1197      throw new RegionServerStoppedException(serverName + " is dead.");
1198    }
1199    String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName,
1200      this.hostnamesCanChange);
1201    return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1202      BlockingRpcChannel channel =
1203          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1204      return AdminProtos.AdminService.newBlockingStub(channel);
1205    });
1206  }
1207
1208  @Override
1209  public BlockingInterface getClient(ServerName serverName) throws IOException {
1210    checkClosed();
1211    if (isDeadServer(serverName)) {
1212      throw new RegionServerStoppedException(serverName + " is dead.");
1213    }
1214    String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(),
1215      serverName, this.hostnamesCanChange);
1216    return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> {
1217      BlockingRpcChannel channel =
1218          this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout);
1219      return ClientProtos.ClientService.newBlockingStub(channel);
1220    });
1221  }
1222
1223  final MasterServiceState masterServiceState = new MasterServiceState(this);
1224
1225  @Override
1226  public MasterKeepAliveConnection getMaster() throws IOException {
1227    return getKeepAliveMasterService();
1228  }
1229
1230  private void resetMasterServiceState(final MasterServiceState mss) {
1231    mss.userCount++;
1232  }
1233
1234  private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
1235    synchronized (masterLock) {
1236      if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1237        MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1238        this.masterServiceState.stub = stubMaker.makeStub();
1239      }
1240      resetMasterServiceState(this.masterServiceState);
1241    }
1242    // Ugly delegation just so we can add in a Close method.
1243    final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
1244    return new MasterKeepAliveConnection() {
1245      MasterServiceState mss = masterServiceState;
1246
1247      @Override
1248      public MasterProtos.AbortProcedureResponse abortProcedure(
1249          RpcController controller,
1250          MasterProtos.AbortProcedureRequest request) throws ServiceException {
1251        return stub.abortProcedure(controller, request);
1252      }
1253
1254      @Override
1255      public MasterProtos.GetProceduresResponse getProcedures(
1256          RpcController controller,
1257          MasterProtos.GetProceduresRequest request) throws ServiceException {
1258        return stub.getProcedures(controller, request);
1259      }
1260
1261      @Override
1262      public MasterProtos.GetLocksResponse getLocks(
1263          RpcController controller,
1264          MasterProtos.GetLocksRequest request) throws ServiceException {
1265        return stub.getLocks(controller, request);
1266      }
1267
1268      @Override
1269      public MasterProtos.AddColumnResponse addColumn(
1270          RpcController controller,
1271          MasterProtos.AddColumnRequest request) throws ServiceException {
1272        return stub.addColumn(controller, request);
1273      }
1274
1275      @Override
1276      public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller,
1277          MasterProtos.DeleteColumnRequest request)
1278      throws ServiceException {
1279        return stub.deleteColumn(controller, request);
1280      }
1281
1282      @Override
1283      public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller,
1284          MasterProtos.ModifyColumnRequest request)
1285      throws ServiceException {
1286        return stub.modifyColumn(controller, request);
1287      }
1288
1289      @Override
1290      public MasterProtos.MoveRegionResponse moveRegion(RpcController controller,
1291          MasterProtos.MoveRegionRequest request) throws ServiceException {
1292        return stub.moveRegion(controller, request);
1293      }
1294
1295      @Override
1296      public MasterProtos.MergeTableRegionsResponse mergeTableRegions(
1297          RpcController controller, MasterProtos.MergeTableRegionsRequest request)
1298          throws ServiceException {
1299        return stub.mergeTableRegions(controller, request);
1300      }
1301
1302      @Override
1303      public MasterProtos.AssignRegionResponse assignRegion(RpcController controller,
1304          MasterProtos.AssignRegionRequest request) throws ServiceException {
1305        return stub.assignRegion(controller, request);
1306      }
1307
1308      @Override
1309      public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller,
1310          MasterProtos.UnassignRegionRequest request) throws ServiceException {
1311        return stub.unassignRegion(controller, request);
1312      }
1313
1314      @Override
1315      public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller,
1316          MasterProtos.OfflineRegionRequest request) throws ServiceException {
1317        return stub.offlineRegion(controller, request);
1318      }
1319
1320      @Override
1321      public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller,
1322          MasterProtos.SplitTableRegionRequest request) throws ServiceException {
1323        return stub.splitRegion(controller, request);
1324      }
1325
1326      @Override
1327      public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
1328          MasterProtos.DeleteTableRequest request) throws ServiceException {
1329        return stub.deleteTable(controller, request);
1330      }
1331
1332      @Override
1333      public MasterProtos.TruncateTableResponse truncateTable(RpcController controller,
1334          MasterProtos.TruncateTableRequest request) throws ServiceException {
1335        return stub.truncateTable(controller, request);
1336      }
1337
1338      @Override
1339      public MasterProtos.EnableTableResponse enableTable(RpcController controller,
1340          MasterProtos.EnableTableRequest request) throws ServiceException {
1341        return stub.enableTable(controller, request);
1342      }
1343
1344      @Override
1345      public MasterProtos.DisableTableResponse disableTable(RpcController controller,
1346          MasterProtos.DisableTableRequest request) throws ServiceException {
1347        return stub.disableTable(controller, request);
1348      }
1349
1350      @Override
1351      public MasterProtos.ModifyTableResponse modifyTable(RpcController controller,
1352          MasterProtos.ModifyTableRequest request) throws ServiceException {
1353        return stub.modifyTable(controller, request);
1354      }
1355
1356      @Override
1357      public MasterProtos.CreateTableResponse createTable(RpcController controller,
1358          MasterProtos.CreateTableRequest request) throws ServiceException {
1359        return stub.createTable(controller, request);
1360      }
1361
1362      @Override
1363      public MasterProtos.ShutdownResponse shutdown(RpcController controller,
1364          MasterProtos.ShutdownRequest request) throws ServiceException {
1365        return stub.shutdown(controller, request);
1366      }
1367
1368      @Override
1369      public MasterProtos.StopMasterResponse stopMaster(RpcController controller,
1370          MasterProtos.StopMasterRequest request) throws ServiceException {
1371        return stub.stopMaster(controller, request);
1372      }
1373
1374      @Override
1375      public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode(
1376          final RpcController controller,
1377          final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException {
1378        return stub.isMasterInMaintenanceMode(controller, request);
1379      }
1380
1381      @Override
1382      public MasterProtos.BalanceResponse balance(RpcController controller,
1383          MasterProtos.BalanceRequest request) throws ServiceException {
1384        return stub.balance(controller, request);
1385      }
1386
1387      @Override
1388      public MasterProtos.SetBalancerRunningResponse setBalancerRunning(
1389          RpcController controller, MasterProtos.SetBalancerRunningRequest request)
1390          throws ServiceException {
1391        return stub.setBalancerRunning(controller, request);
1392      }
1393
1394      @Override
1395      public NormalizeResponse normalize(RpcController controller,
1396          NormalizeRequest request) throws ServiceException {
1397        return stub.normalize(controller, request);
1398      }
1399
1400      @Override
1401      public SetNormalizerRunningResponse setNormalizerRunning(
1402          RpcController controller, SetNormalizerRunningRequest request)
1403          throws ServiceException {
1404        return stub.setNormalizerRunning(controller, request);
1405      }
1406
1407      @Override
1408      public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller,
1409          MasterProtos.RunCatalogScanRequest request) throws ServiceException {
1410        return stub.runCatalogScan(controller, request);
1411      }
1412
1413      @Override
1414      public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor(
1415          RpcController controller, MasterProtos.EnableCatalogJanitorRequest request)
1416          throws ServiceException {
1417        return stub.enableCatalogJanitor(controller, request);
1418      }
1419
1420      @Override
1421      public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(
1422          RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request)
1423          throws ServiceException {
1424        return stub.isCatalogJanitorEnabled(controller, request);
1425      }
1426
1427      @Override
1428      public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller,
1429          MasterProtos.RunCleanerChoreRequest request)
1430          throws ServiceException {
1431        return stub.runCleanerChore(controller, request);
1432      }
1433
1434      @Override
1435      public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning(
1436          RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request)
1437          throws ServiceException {
1438        return stub.setCleanerChoreRunning(controller, request);
1439      }
1440
1441      @Override
1442      public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled(
1443          RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request)
1444          throws ServiceException {
1445        return stub.isCleanerChoreEnabled(controller, request);
1446      }
1447
1448      @Override
1449      public ClientProtos.CoprocessorServiceResponse execMasterService(
1450          RpcController controller, ClientProtos.CoprocessorServiceRequest request)
1451          throws ServiceException {
1452        return stub.execMasterService(controller, request);
1453      }
1454
1455      @Override
1456      public MasterProtos.SnapshotResponse snapshot(RpcController controller,
1457          MasterProtos.SnapshotRequest request) throws ServiceException {
1458        return stub.snapshot(controller, request);
1459      }
1460
1461      @Override
1462      public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots(
1463          RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request)
1464          throws ServiceException {
1465        return stub.getCompletedSnapshots(controller, request);
1466      }
1467
1468      @Override
1469      public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller,
1470          MasterProtos.DeleteSnapshotRequest request) throws ServiceException {
1471        return stub.deleteSnapshot(controller, request);
1472      }
1473
1474      @Override
1475      public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
1476          MasterProtos.IsSnapshotDoneRequest request) throws ServiceException {
1477        return stub.isSnapshotDone(controller, request);
1478      }
1479
1480      @Override
1481      public MasterProtos.RestoreSnapshotResponse restoreSnapshot(
1482          RpcController controller, MasterProtos.RestoreSnapshotRequest request)
1483          throws ServiceException {
1484        return stub.restoreSnapshot(controller, request);
1485      }
1486
1487      @Override
1488      public MasterProtos.ExecProcedureResponse execProcedure(
1489          RpcController controller, MasterProtos.ExecProcedureRequest request)
1490          throws ServiceException {
1491        return stub.execProcedure(controller, request);
1492      }
1493
1494      @Override
1495      public MasterProtos.ExecProcedureResponse execProcedureWithRet(
1496          RpcController controller, MasterProtos.ExecProcedureRequest request)
1497          throws ServiceException {
1498        return stub.execProcedureWithRet(controller, request);
1499      }
1500
1501      @Override
1502      public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller,
1503          MasterProtos.IsProcedureDoneRequest request) throws ServiceException {
1504        return stub.isProcedureDone(controller, request);
1505      }
1506
1507      @Override
1508      public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller,
1509          MasterProtos.GetProcedureResultRequest request) throws ServiceException {
1510        return stub.getProcedureResult(controller, request);
1511      }
1512
1513      @Override
1514      public MasterProtos.IsMasterRunningResponse isMasterRunning(
1515          RpcController controller, MasterProtos.IsMasterRunningRequest request)
1516          throws ServiceException {
1517        return stub.isMasterRunning(controller, request);
1518      }
1519
1520      @Override
1521      public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller,
1522          MasterProtos.ModifyNamespaceRequest request)
1523      throws ServiceException {
1524        return stub.modifyNamespace(controller, request);
1525      }
1526
1527      @Override
1528      public MasterProtos.CreateNamespaceResponse createNamespace(
1529          RpcController controller,
1530          MasterProtos.CreateNamespaceRequest request) throws ServiceException {
1531        return stub.createNamespace(controller, request);
1532      }
1533
1534      @Override
1535      public MasterProtos.DeleteNamespaceResponse deleteNamespace(
1536          RpcController controller,
1537          MasterProtos.DeleteNamespaceRequest request) throws ServiceException {
1538        return stub.deleteNamespace(controller, request);
1539      }
1540
1541      @Override
1542      public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor(
1543          RpcController controller,
1544          MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException {
1545        return stub.getNamespaceDescriptor(controller, request);
1546      }
1547
1548      @Override
1549      public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors(
1550          RpcController controller,
1551          MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException {
1552        return stub.listNamespaceDescriptors(controller, request);
1553      }
1554
1555      @Override
1556      public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
1557          RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request)
1558              throws ServiceException {
1559        return stub.listTableDescriptorsByNamespace(controller, request);
1560      }
1561
1562      @Override
1563      public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace(
1564          RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request)
1565              throws ServiceException {
1566        return stub.listTableNamesByNamespace(controller, request);
1567      }
1568
1569      @Override
1570      public MasterProtos.GetTableStateResponse getTableState(
1571              RpcController controller, MasterProtos.GetTableStateRequest request)
1572              throws ServiceException {
1573        return stub.getTableState(controller, request);
1574      }
1575
1576      @Override
1577      public void close() {
1578        release(this.mss);
1579      }
1580
1581      @Override
1582      public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus(
1583          RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request)
1584          throws ServiceException {
1585        return stub.getSchemaAlterStatus(controller, request);
1586      }
1587
1588      @Override
1589      public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(
1590          RpcController controller, MasterProtos.GetTableDescriptorsRequest request)
1591          throws ServiceException {
1592        return stub.getTableDescriptors(controller, request);
1593      }
1594
1595      @Override
1596      public MasterProtos.GetTableNamesResponse getTableNames(
1597          RpcController controller, MasterProtos.GetTableNamesRequest request)
1598          throws ServiceException {
1599        return stub.getTableNames(controller, request);
1600      }
1601
1602      @Override
1603      public MasterProtos.GetClusterStatusResponse getClusterStatus(
1604          RpcController controller, MasterProtos.GetClusterStatusRequest request)
1605          throws ServiceException {
1606        return stub.getClusterStatus(controller, request);
1607      }
1608
1609      @Override
1610      public MasterProtos.SetQuotaResponse setQuota(
1611          RpcController controller, MasterProtos.SetQuotaRequest request)
1612          throws ServiceException {
1613        return stub.setQuota(controller, request);
1614      }
1615
1616      @Override
1617      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(
1618          RpcController controller, MasterProtos.MajorCompactionTimestampRequest request)
1619          throws ServiceException {
1620        return stub.getLastMajorCompactionTimestamp(controller, request);
1621      }
1622
1623      @Override
1624      public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion(
1625          RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request)
1626          throws ServiceException {
1627        return stub.getLastMajorCompactionTimestampForRegion(controller, request);
1628      }
1629
1630      @Override
1631      public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller,
1632          IsBalancerEnabledRequest request) throws ServiceException {
1633        return stub.isBalancerEnabled(controller, request);
1634      }
1635
1636      @Override
1637      public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(
1638        RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request)
1639        throws ServiceException {
1640        return stub.setSplitOrMergeEnabled(controller, request);
1641      }
1642
1643      @Override
1644      public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled(
1645        RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request)
1646              throws ServiceException {
1647        return stub.isSplitOrMergeEnabled(controller, request);
1648      }
1649
1650      @Override
1651      public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
1652          IsNormalizerEnabledRequest request) throws ServiceException {
1653        return stub.isNormalizerEnabled(controller, request);
1654      }
1655
1656      @Override
1657      public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller,
1658          SecurityCapabilitiesRequest request) throws ServiceException {
1659        return stub.getSecurityCapabilities(controller, request);
1660      }
1661
1662      @Override
1663      public AddReplicationPeerResponse addReplicationPeer(RpcController controller,
1664          AddReplicationPeerRequest request) throws ServiceException {
1665        return stub.addReplicationPeer(controller, request);
1666      }
1667
1668      @Override
1669      public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller,
1670          RemoveReplicationPeerRequest request) throws ServiceException {
1671        return stub.removeReplicationPeer(controller, request);
1672      }
1673
1674      @Override
1675      public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller,
1676          EnableReplicationPeerRequest request) throws ServiceException {
1677        return stub.enableReplicationPeer(controller, request);
1678      }
1679
1680      @Override
1681      public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
1682          DisableReplicationPeerRequest request) throws ServiceException {
1683        return stub.disableReplicationPeer(controller, request);
1684      }
1685
1686      @Override
1687      public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller,
1688          ListDecommissionedRegionServersRequest request) throws ServiceException {
1689        return stub.listDecommissionedRegionServers(controller, request);
1690      }
1691
1692      @Override
1693      public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller,
1694          DecommissionRegionServersRequest request) throws ServiceException {
1695        return stub.decommissionRegionServers(controller, request);
1696      }
1697
1698      @Override
1699      public RecommissionRegionServerResponse recommissionRegionServer(
1700          RpcController controller, RecommissionRegionServerRequest request)
1701          throws ServiceException {
1702        return stub.recommissionRegionServer(controller, request);
1703      }
1704
1705      @Override
1706      public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
1707          GetReplicationPeerConfigRequest request) throws ServiceException {
1708        return stub.getReplicationPeerConfig(controller, request);
1709      }
1710
1711      @Override
1712      public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig(
1713          RpcController controller, UpdateReplicationPeerConfigRequest request)
1714          throws ServiceException {
1715        return stub.updateReplicationPeerConfig(controller, request);
1716      }
1717
1718      @Override
1719      public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
1720          ListReplicationPeersRequest request) throws ServiceException {
1721        return stub.listReplicationPeers(controller, request);
1722      }
1723
1724      @Override
1725      public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(
1726          RpcController controller, GetSpaceQuotaRegionSizesRequest request)
1727          throws ServiceException {
1728        return stub.getSpaceQuotaRegionSizes(controller, request);
1729      }
1730
1731      @Override
1732      public GetQuotaStatesResponse getQuotaStates(
1733          RpcController controller, GetQuotaStatesRequest request) throws ServiceException {
1734        return stub.getQuotaStates(controller, request);
1735      }
1736
1737      @Override
1738      public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller,
1739          MasterProtos.ClearDeadServersRequest request) throws ServiceException {
1740        return stub.clearDeadServers(controller, request);
1741      }
1742    };
1743  }
1744
1745  private static void release(MasterServiceState mss) {
1746    if (mss != null && mss.connection != null) {
1747      ((ConnectionImplementation)mss.connection).releaseMaster(mss);
1748    }
1749  }
1750
1751  private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
1752    if (mss.getStub() == null){
1753      return false;
1754    }
1755    try {
1756      return mss.isMasterRunning();
1757    } catch (UndeclaredThrowableException e) {
1758      // It's somehow messy, but we can receive exceptions such as
1759      //  java.net.ConnectException but they're not declared. So we catch it...
1760      LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
1761      return false;
1762    } catch (IOException se) {
1763      LOG.warn("Checking master connection", se);
1764      return false;
1765    }
1766  }
1767
1768  void releaseMaster(MasterServiceState mss) {
1769    if (mss.getStub() == null) {
1770      return;
1771    }
1772    synchronized (masterLock) {
1773      --mss.userCount;
1774    }
1775  }
1776
1777  private void closeMasterService(MasterServiceState mss) {
1778    if (mss.getStub() != null) {
1779      LOG.info("Closing master protocol: " + mss);
1780      mss.clearStub();
1781    }
1782    mss.userCount = 0;
1783  }
1784
1785  /**
1786   * Immediate close of the shared master. Can be by the delayed close or when closing the
1787   * connection itself.
1788   */
1789  private void closeMaster() {
1790    synchronized (masterLock) {
1791      closeMasterService(masterServiceState);
1792    }
1793  }
1794
1795  void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) {
1796    HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
1797    cacheLocation(hri.getTable(), source, newHrl);
1798  }
1799
1800  @Override
1801  public void deleteCachedRegionLocation(final HRegionLocation location) {
1802    metaCache.clearCache(location);
1803  }
1804
1805  /**
1806   * Update the location with the new value (if the exception is a RegionMovedException)
1807   * or delete it from the cache. Does nothing if we can be sure from the exception that
1808   * the location is still accurate, or if the cache has already been updated.
1809   * @param exception an object (to simplify user code) on which we will try to find a nested
1810   *   or wrapped or both RegionMovedException
1811   * @param source server that is the source of the location update.
1812   */
1813  @Override
1814  public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey,
1815    final Object exception, final ServerName source) {
1816    if (rowkey == null || tableName == null) {
1817      LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
1818          ", tableName=" + (tableName == null ? "null" : tableName));
1819      return;
1820    }
1821
1822    if (source == null) {
1823      // This should not happen, but let's secure ourselves.
1824      return;
1825    }
1826
1827    if (regionName == null) {
1828      // we do not know which region, so just remove the cache entry for the row and server
1829      if (metrics != null) {
1830        metrics.incrCacheDroppingExceptions(exception);
1831      }
1832      metaCache.clearCache(tableName, rowkey, source);
1833      return;
1834    }
1835
1836    // Is it something we have already updated?
1837    final RegionLocations oldLocations = getCachedLocation(tableName, rowkey);
1838    HRegionLocation oldLocation = null;
1839    if (oldLocations != null) {
1840      oldLocation = oldLocations.getRegionLocationByRegionName(regionName);
1841    }
1842    if (oldLocation == null || !source.equals(oldLocation.getServerName())) {
1843      // There is no such location in the cache (it's been removed already) or
1844      // the cache has already been refreshed with a different location.  => nothing to do
1845      return;
1846    }
1847
1848    RegionInfo regionInfo = oldLocation.getRegion();
1849    Throwable cause = ClientExceptionsUtil.findException(exception);
1850    if (cause != null) {
1851      if (!ClientExceptionsUtil.isMetaClearingException(cause)) {
1852        // We know that the region is still on this region server
1853        return;
1854      }
1855
1856      if (cause instanceof RegionMovedException) {
1857        RegionMovedException rme = (RegionMovedException) cause;
1858        if (LOG.isTraceEnabled()) {
1859          LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " +
1860              rme.getHostname() + ":" + rme.getPort() +
1861              " according to " + source.getAddress());
1862        }
1863        // We know that the region is not anymore on this region server, but we know
1864        //  the new location.
1865        updateCachedLocation(
1866            regionInfo, source, rme.getServerName(), rme.getLocationSeqNum());
1867        return;
1868      }
1869    }
1870
1871    if (metrics != null) {
1872      metrics.incrCacheDroppingExceptions(exception);
1873    }
1874
1875    // If we're here, it means that can cannot be sure about the location, so we remove it from
1876    // the cache. Do not send the source because source can be a new server in the same host:port
1877    metaCache.clearCache(regionInfo);
1878  }
1879
1880  @Override
1881  public AsyncProcess getAsyncProcess() {
1882    return asyncProcess;
1883  }
1884
1885  @Override
1886  public ServerStatisticTracker getStatisticsTracker() {
1887    return this.stats;
1888  }
1889
1890  @Override
1891  public ClientBackoffPolicy getBackoffPolicy() {
1892    return this.backoffPolicy;
1893  }
1894
1895  /*
1896   * Return the number of cached region for a table. It will only be called
1897   * from a unit test.
1898   */
1899  @VisibleForTesting
1900  int getNumberOfCachedRegionLocations(final TableName tableName) {
1901    return metaCache.getNumberOfCachedRegionLocations(tableName);
1902  }
1903
1904  @Override
1905  public void abort(final String msg, Throwable t) {
1906    if (t != null) {
1907      LOG.error(HBaseMarkers.FATAL, msg, t);
1908    } else {
1909      LOG.error(HBaseMarkers.FATAL, msg);
1910    }
1911    this.aborted = true;
1912    close();
1913    this.closed = true;
1914  }
1915
1916  @Override
1917  public boolean isClosed() {
1918    return this.closed;
1919  }
1920
1921  @Override
1922  public boolean isAborted(){
1923    return this.aborted;
1924  }
1925
1926  @Override
1927  public void close() {
1928    if (this.closed) {
1929      return;
1930    }
1931    closeMaster();
1932    shutdownPools();
1933    if (this.metrics != null) {
1934      this.metrics.shutdown();
1935    }
1936    this.closed = true;
1937    registry.close();
1938    this.stubs.clear();
1939    if (clusterStatusListener != null) {
1940      clusterStatusListener.close();
1941    }
1942    if (rpcClient != null) {
1943      rpcClient.close();
1944    }
1945  }
1946
1947  /**
1948   * Close the connection for good. On the off chance that someone is unable to close
1949   * the connection, perhaps because it bailed out prematurely, the method
1950   * below will ensure that this instance is cleaned up.
1951   * Caveat: The JVM may take an unknown amount of time to call finalize on an
1952   * unreachable object, so our hope is that every consumer cleans up after
1953   * itself, like any good citizen.
1954   */
1955  @Override
1956  protected void finalize() throws Throwable {
1957    super.finalize();
1958    close();
1959  }
1960
1961  @Override
1962  public NonceGenerator getNonceGenerator() {
1963    return nonceGenerator;
1964  }
1965
1966  @Override
1967  public TableState getTableState(TableName tableName) throws IOException {
1968    checkClosed();
1969    TableState tableState = MetaTableAccessor.getTableState(this, tableName);
1970    if (tableState == null) {
1971      throw new TableNotFoundException(tableName);
1972    }
1973    return tableState;
1974  }
1975
1976  @Override
1977  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
1978    return RpcRetryingCallerFactory
1979        .instantiate(conf, this.interceptor, this.getStatisticsTracker());
1980  }
1981
1982  @Override
1983  public boolean hasCellBlockSupport() {
1984    return this.rpcClient.hasCellBlockSupport();
1985  }
1986
1987  @Override
1988  public ConnectionConfiguration getConnectionConfiguration() {
1989    return this.connectionConfig;
1990  }
1991
1992  @Override
1993  public RpcRetryingCallerFactory getRpcRetryingCallerFactory() {
1994    return this.rpcCallerFactory;
1995  }
1996
1997  @Override
1998  public RpcControllerFactory getRpcControllerFactory() {
1999    return this.rpcControllerFactory;
2000  }
2001
2002  private static <T> T get(CompletableFuture<T> future) throws IOException {
2003    try {
2004      return future.get();
2005    } catch (InterruptedException e) {
2006      Thread.currentThread().interrupt();
2007      throw (IOException) new InterruptedIOException().initCause(e);
2008    } catch (ExecutionException e) {
2009      Throwable cause = e.getCause();
2010      Throwables.propagateIfPossible(cause, IOException.class);
2011      throw new IOException(cause);
2012    }
2013  }
2014}