001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 024import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 026import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsentEx; 027 028import edu.umd.cs.findbugs.annotations.Nullable; 029import java.io.Closeable; 030import java.io.IOException; 031import java.io.InterruptedIOException; 032import java.lang.reflect.UndeclaredThrowableException; 033import java.util.ArrayList; 034import java.util.Collections; 035import java.util.Date; 036import java.util.List; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.LinkedBlockingQueue; 044import java.util.concurrent.ThreadPoolExecutor; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.locks.ReentrantLock; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.CallQueueTooBigException; 050import org.apache.hadoop.hbase.DoNotRetryIOException; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.MasterNotRunningException; 054import org.apache.hadoop.hbase.MetaTableAccessor; 055import org.apache.hadoop.hbase.RegionLocations; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.TableNotEnabledException; 059import org.apache.hadoop.hbase.TableNotFoundException; 060import org.apache.hadoop.hbase.ZooKeeperConnectionException; 061import org.apache.hadoop.hbase.client.Scan.ReadType; 062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 063import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 064import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 065import org.apache.hadoop.hbase.exceptions.RegionMovedException; 066import org.apache.hadoop.hbase.ipc.RpcClient; 067import org.apache.hadoop.hbase.ipc.RpcClientFactory; 068import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 069import org.apache.hadoop.hbase.log.HBaseMarkers; 070import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 071import org.apache.hadoop.hbase.security.User; 072import org.apache.hadoop.hbase.util.Bytes; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.ExceptionUtil; 075import org.apache.hadoop.hbase.util.Pair; 076import org.apache.hadoop.hbase.util.ReflectionUtils; 077import org.apache.hadoop.hbase.util.Threads; 078import org.apache.hadoop.ipc.RemoteException; 079import org.apache.yetus.audience.InterfaceAudience; 080import org.apache.zookeeper.KeeperException; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 085import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 086import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 087import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 088import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 089import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 090import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 129 130/** 131 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. 132 * Encapsulates connection to zookeeper and regionservers. 133 */ 134@edu.umd.cs.findbugs.annotations.SuppressWarnings( 135 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", 136 justification="Access to the conncurrent hash map is under a lock so should be fine.") 137@InterfaceAudience.Private 138class ConnectionImplementation implements ClusterConnection, Closeable { 139 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; 140 private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); 141 142 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; 143 144 private final boolean hostnamesCanChange; 145 private final long pause; 146 private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified 147 private boolean useMetaReplicas; 148 private final int metaReplicaCallTimeoutScanInMicroSecond; 149 private final int numTries; 150 final int rpcTimeout; 151 152 /** 153 * Global nonceGenerator shared per client.Currently there's no reason to limit its scope. 154 * Once it's set under nonceGeneratorCreateLock, it is never unset or changed. 155 */ 156 private static volatile NonceGenerator nonceGenerator = null; 157 /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */ 158 private static final Object nonceGeneratorCreateLock = new Object(); 159 160 private final AsyncProcess asyncProcess; 161 // single tracker per connection 162 private final ServerStatisticTracker stats; 163 164 private volatile boolean closed; 165 private volatile boolean aborted; 166 167 // package protected for the tests 168 ClusterStatusListener clusterStatusListener; 169 170 private final Object metaRegionLock = new Object(); 171 172 private final Object masterLock = new Object(); 173 174 // thread executor shared by all Table instances created 175 // by this connection 176 private volatile ExecutorService batchPool = null; 177 // meta thread executor shared by all Table instances created 178 // by this connection 179 private volatile ExecutorService metaLookupPool = null; 180 private volatile boolean cleanupPool = false; 181 182 private final Configuration conf; 183 184 // cache the configuration value for tables so that we can avoid calling 185 // the expensive Configuration to fetch the value multiple times. 186 private final ConnectionConfiguration connectionConfig; 187 188 // Client rpc instance. 189 private final RpcClient rpcClient; 190 191 private final MetaCache metaCache; 192 private final MetricsConnection metrics; 193 194 protected User user; 195 196 private final RpcRetryingCallerFactory rpcCallerFactory; 197 198 private final RpcControllerFactory rpcControllerFactory; 199 200 private final RetryingCallerInterceptor interceptor; 201 202 /** 203 * Cluster registry of basic info such as clusterid and meta region location. 204 */ 205 private final AsyncRegistry registry; 206 207 private final ClientBackoffPolicy backoffPolicy; 208 209 /** 210 * Allow setting an alternate BufferedMutator implementation via 211 * config. If null, use default. 212 */ 213 private final String alternateBufferedMutatorClassName; 214 215 /** lock guards against multiple threads trying to query the meta region at the same time */ 216 private final ReentrantLock userRegionLock = new ReentrantLock(); 217 218 /** 219 * constructor 220 * @param conf Configuration object 221 */ 222 ConnectionImplementation(Configuration conf, 223 ExecutorService pool, User user) throws IOException { 224 this.conf = conf; 225 this.user = user; 226 this.batchPool = pool; 227 this.connectionConfig = new ConnectionConfiguration(conf); 228 this.closed = false; 229 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 230 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 231 long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); 232 if (configuredPauseForCQTBE < pause) { 233 LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " 234 + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE 235 + ", will use " + pause + " instead."); 236 this.pauseForCQTBE = pause; 237 } else { 238 this.pauseForCQTBE = configuredPauseForCQTBE; 239 } 240 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, 241 HConstants.DEFAULT_USE_META_REPLICAS); 242 this.metaReplicaCallTimeoutScanInMicroSecond = 243 connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan(); 244 245 // how many times to try, one more than max *retry* time 246 this.numTries = retries2Attempts(connectionConfig.getRetriesNumber()); 247 this.rpcTimeout = conf.getInt( 248 HConstants.HBASE_RPC_TIMEOUT_KEY, 249 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 250 if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) { 251 synchronized (nonceGeneratorCreateLock) { 252 if (nonceGenerator == null) { 253 nonceGenerator = PerClientRandomNonceGenerator.get(); 254 } 255 } 256 } else { 257 nonceGenerator = NO_NONCE_GENERATOR; 258 } 259 260 this.stats = ServerStatisticTracker.create(conf); 261 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); 262 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 263 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); 264 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 265 this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); 266 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 267 this.metrics = new MetricsConnection(this); 268 } else { 269 this.metrics = null; 270 } 271 this.metaCache = new MetaCache(this.metrics); 272 273 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, 274 HConstants.STATUS_PUBLISHED_DEFAULT); 275 this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); 276 Class<? extends ClusterStatusListener.Listener> listenerClass = 277 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, 278 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, 279 ClusterStatusListener.Listener.class); 280 281 // Is there an alternate BufferedMutator to use? 282 this.alternateBufferedMutatorClassName = 283 this.conf.get(BufferedMutator.CLASSNAME_KEY); 284 285 try { 286 this.registry = AsyncRegistryFactory.getRegistry(conf); 287 retrieveClusterId(); 288 289 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); 290 291 // Do we publish the status? 292 if (shouldListen) { 293 if (listenerClass == null) { 294 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + 295 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); 296 } else { 297 clusterStatusListener = new ClusterStatusListener( 298 new ClusterStatusListener.DeadServerHandler() { 299 @Override 300 public void newDead(ServerName sn) { 301 clearCaches(sn); 302 rpcClient.cancelConnections(sn); 303 } 304 }, conf, listenerClass); 305 } 306 } 307 } catch (Throwable e) { 308 // avoid leaks: registry, rpcClient, ... 309 LOG.debug("connection construction failed", e); 310 close(); 311 throw e; 312 } 313 } 314 315 /** 316 * @param useMetaReplicas 317 */ 318 @VisibleForTesting 319 void setUseMetaReplicas(final boolean useMetaReplicas) { 320 this.useMetaReplicas = useMetaReplicas; 321 } 322 323 /** 324 * @param conn The connection for which to replace the generator. 325 * @param cnm Replaces the nonce generator used, for testing. 326 * @return old nonce generator. 327 */ 328 @VisibleForTesting 329 static NonceGenerator injectNonceGeneratorForTesting( 330 ClusterConnection conn, NonceGenerator cnm) { 331 ConnectionImplementation connImpl = (ConnectionImplementation)conn; 332 NonceGenerator ng = connImpl.getNonceGenerator(); 333 LOG.warn("Nonce generator is being replaced by test code for " 334 + cnm.getClass().getName()); 335 nonceGenerator = cnm; 336 return ng; 337 } 338 339 @Override 340 public Table getTable(TableName tableName) throws IOException { 341 return getTable(tableName, getBatchPool()); 342 } 343 344 @Override 345 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 346 return new TableBuilderBase(tableName, connectionConfig) { 347 348 @Override 349 public Table build() { 350 return new HTable(ConnectionImplementation.this, this, rpcCallerFactory, 351 rpcControllerFactory, pool); 352 } 353 }; 354 } 355 356 @Override 357 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { 358 if (params.getTableName() == null) { 359 throw new IllegalArgumentException("TableName cannot be null."); 360 } 361 if (params.getPool() == null) { 362 params.pool(HTable.getDefaultExecutor(getConfiguration())); 363 } 364 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { 365 params.writeBufferSize(connectionConfig.getWriteBufferSize()); 366 } 367 if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) { 368 params.setWriteBufferPeriodicFlushTimeoutMs( 369 connectionConfig.getWriteBufferPeriodicFlushTimeoutMs()); 370 } 371 if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) { 372 params.setWriteBufferPeriodicFlushTimerTickMs( 373 connectionConfig.getWriteBufferPeriodicFlushTimerTickMs()); 374 } 375 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { 376 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); 377 } 378 // Look to see if an alternate BufferedMutation implementation is wanted. 379 // Look in params and in config. If null, use default. 380 String implementationClassName = params.getImplementationClassName(); 381 if (implementationClassName == null) { 382 implementationClassName = this.alternateBufferedMutatorClassName; 383 } 384 if (implementationClassName == null) { 385 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); 386 } 387 try { 388 return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName), 389 this, rpcCallerFactory, rpcControllerFactory, params); 390 } catch (ClassNotFoundException e) { 391 throw new RuntimeException(e); 392 } 393 } 394 395 @Override 396 public BufferedMutator getBufferedMutator(TableName tableName) { 397 return getBufferedMutator(new BufferedMutatorParams(tableName)); 398 } 399 400 @Override 401 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 402 return new HRegionLocator(tableName, this); 403 } 404 405 @Override 406 public Admin getAdmin() throws IOException { 407 return new HBaseAdmin(this); 408 } 409 410 @Override 411 public Hbck getHbck() throws IOException { 412 return getHbck(get(registry.getMasterAddress())); 413 } 414 415 @Override 416 public Hbck getHbck(ServerName masterServer) throws IOException { 417 checkClosed(); 418 if (isDeadServer(masterServer)) { 419 throw new RegionServerStoppedException(masterServer + " is dead."); 420 } 421 String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), 422 masterServer, this.hostnamesCanChange); 423 424 return new HBaseHbck( 425 (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 426 BlockingRpcChannel channel = 427 this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); 428 return MasterProtos.HbckService.newBlockingStub(channel); 429 }), rpcControllerFactory); 430 } 431 432 @Override 433 public MetricsConnection getConnectionMetrics() { 434 return this.metrics; 435 } 436 437 private ExecutorService getBatchPool() { 438 if (batchPool == null) { 439 synchronized (this) { 440 if (batchPool == null) { 441 int threads = conf.getInt("hbase.hconnection.threads.max", 256); 442 this.batchPool = getThreadPool(threads, threads, "-shared", null); 443 this.cleanupPool = true; 444 } 445 } 446 } 447 return this.batchPool; 448 } 449 450 private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint, 451 BlockingQueue<Runnable> passedWorkQueue) { 452 // shared HTable thread executor not yet initialized 453 if (maxThreads == 0) { 454 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 455 } 456 if (coreThreads == 0) { 457 coreThreads = Runtime.getRuntime().availableProcessors() * 8; 458 } 459 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); 460 BlockingQueue<Runnable> workQueue = passedWorkQueue; 461 if (workQueue == null) { 462 workQueue = 463 new LinkedBlockingQueue<>(maxThreads * 464 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 465 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 466 coreThreads = maxThreads; 467 } 468 ThreadPoolExecutor tpe = new ThreadPoolExecutor( 469 coreThreads, 470 maxThreads, 471 keepAliveTime, 472 TimeUnit.SECONDS, 473 workQueue, 474 Threads.newDaemonThreadFactory(toString() + nameHint)); 475 tpe.allowCoreThreadTimeOut(true); 476 return tpe; 477 } 478 479 private ExecutorService getMetaLookupPool() { 480 if (this.metaLookupPool == null) { 481 synchronized (this) { 482 if (this.metaLookupPool == null) { 483 //Some of the threads would be used for meta replicas 484 //To start with, threads.max.core threads can hit the meta (including replicas). 485 //After that, requests will get queued up in the passed queue, and only after 486 //the queue is full, a new thread will be started 487 int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128); 488 this.metaLookupPool = getThreadPool( 489 threads, 490 threads, 491 "-metaLookup-shared-", new LinkedBlockingQueue<>()); 492 } 493 } 494 } 495 return this.metaLookupPool; 496 } 497 498 protected ExecutorService getCurrentMetaLookupPool() { 499 return metaLookupPool; 500 } 501 502 protected ExecutorService getCurrentBatchPool() { 503 return batchPool; 504 } 505 506 private void shutdownPools() { 507 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { 508 shutdownBatchPool(this.batchPool); 509 } 510 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { 511 shutdownBatchPool(this.metaLookupPool); 512 } 513 } 514 515 private void shutdownBatchPool(ExecutorService pool) { 516 pool.shutdown(); 517 try { 518 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 519 pool.shutdownNow(); 520 } 521 } catch (InterruptedException e) { 522 pool.shutdownNow(); 523 } 524 } 525 526 /** 527 * For tests only. 528 */ 529 @VisibleForTesting 530 RpcClient getRpcClient() { 531 return rpcClient; 532 } 533 534 /** 535 * An identifier that will remain the same for a given connection. 536 */ 537 @Override 538 public String toString(){ 539 return "hconnection-0x" + Integer.toHexString(hashCode()); 540 } 541 542 protected String clusterId = null; 543 544 protected void retrieveClusterId() { 545 if (clusterId != null) { 546 return; 547 } 548 try { 549 this.clusterId = this.registry.getClusterId().get(); 550 } catch (InterruptedException | ExecutionException e) { 551 LOG.warn("Retrieve cluster id failed", e); 552 } 553 if (clusterId == null) { 554 clusterId = HConstants.CLUSTER_ID_DEFAULT; 555 LOG.debug("clusterid came back null, using default " + clusterId); 556 } 557 } 558 559 @Override 560 public Configuration getConfiguration() { 561 return this.conf; 562 } 563 564 private void checkClosed() throws DoNotRetryIOException { 565 if (this.closed) { 566 throw new DoNotRetryIOException(toString() + " closed"); 567 } 568 } 569 570 /** 571 * @return true if the master is running, throws an exception otherwise 572 * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running 573 * @deprecated this has been deprecated without a replacement 574 */ 575 @Deprecated 576 @Override 577 public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { 578 // When getting the master connection, we check it's running, 579 // so if there is no exception, it means we've been able to get a 580 // connection on a running master 581 MasterKeepAliveConnection m; 582 try { 583 m = getKeepAliveMasterService(); 584 } catch (IOException e) { 585 throw new MasterNotRunningException(e); 586 } 587 m.close(); 588 return true; 589 } 590 591 @Override 592 public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, 593 boolean reload) throws IOException { 594 return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row); 595 } 596 597 598 @Override 599 public boolean isTableEnabled(TableName tableName) throws IOException { 600 return getTableState(tableName).inStates(TableState.State.ENABLED); 601 } 602 603 @Override 604 public boolean isTableDisabled(TableName tableName) throws IOException { 605 return getTableState(tableName).inStates(TableState.State.DISABLED); 606 } 607 608 @Override 609 public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) 610 throws IOException { 611 checkClosed(); 612 try { 613 if (!isTableEnabled(tableName)) { 614 LOG.debug("Table {} not enabled", tableName); 615 return false; 616 } 617 List<Pair<RegionInfo, ServerName>> locations = 618 MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true); 619 620 int notDeployed = 0; 621 int regionCount = 0; 622 for (Pair<RegionInfo, ServerName> pair : locations) { 623 RegionInfo info = pair.getFirst(); 624 if (pair.getSecond() == null) { 625 LOG.debug("Table {} has not deployed region {}", tableName, 626 pair.getFirst().getEncodedName()); 627 notDeployed++; 628 } else if (splitKeys != null 629 && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 630 for (byte[] splitKey : splitKeys) { 631 // Just check if the splitkey is available 632 if (Bytes.equals(info.getStartKey(), splitKey)) { 633 regionCount++; 634 break; 635 } 636 } 637 } else { 638 // Always empty start row should be counted 639 regionCount++; 640 } 641 } 642 if (notDeployed > 0) { 643 if (LOG.isDebugEnabled()) { 644 LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed); 645 } 646 return false; 647 } else if (splitKeys != null && regionCount != splitKeys.length + 1) { 648 if (LOG.isDebugEnabled()) { 649 LOG.debug("Table {} expected to have {} regions, but only {} available", tableName, 650 splitKeys.length + 1, regionCount); 651 } 652 return false; 653 } else { 654 LOG.trace("Table {} should be available", tableName); 655 return true; 656 } 657 } catch (TableNotFoundException tnfe) { 658 LOG.warn("Table {} does not exist", tableName); 659 return false; 660 } 661 } 662 663 @Override 664 public HRegionLocation locateRegion(final byte[] regionName) throws IOException { 665 RegionLocations locations = locateRegion(RegionInfo.getTable(regionName), 666 RegionInfo.getStartKey(regionName), false, true); 667 return locations == null ? null : locations.getRegionLocation(); 668 } 669 670 private boolean isDeadServer(ServerName sn) { 671 if (clusterStatusListener == null) { 672 return false; 673 } else { 674 return clusterStatusListener.isDeadServer(sn); 675 } 676 } 677 678 @Override 679 public List<HRegionLocation> locateRegions(TableName tableName) throws IOException { 680 return locateRegions(tableName, false, true); 681 } 682 683 @Override 684 public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache, 685 boolean offlined) throws IOException { 686 List<RegionInfo> regions; 687 if (TableName.isMetaTableName(tableName)) { 688 regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO); 689 } else { 690 regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined); 691 } 692 List<HRegionLocation> locations = new ArrayList<>(); 693 for (RegionInfo regionInfo : regions) { 694 if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) { 695 continue; 696 } 697 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true); 698 if (list != null) { 699 for (HRegionLocation loc : list.getRegionLocations()) { 700 if (loc != null) { 701 locations.add(loc); 702 } 703 } 704 } 705 } 706 return locations; 707 } 708 709 @Override 710 public HRegionLocation locateRegion(final TableName tableName, final byte[] row) 711 throws IOException { 712 RegionLocations locations = locateRegion(tableName, row, true, true); 713 return locations == null ? null : locations.getRegionLocation(); 714 } 715 716 @Override 717 public HRegionLocation relocateRegion(final TableName tableName, final byte[] row) 718 throws IOException { 719 RegionLocations locations = 720 relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 721 return locations == null ? null 722 : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); 723 } 724 725 @Override 726 public RegionLocations relocateRegion(final TableName tableName, 727 final byte [] row, int replicaId) throws IOException{ 728 // Since this is an explicit request not to use any caching, finding 729 // disabled tables should not be desirable. This will ensure that an exception is thrown when 730 // the first time a disabled table is interacted with. 731 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) { 732 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); 733 } 734 735 return locateRegion(tableName, row, false, true, replicaId); 736 } 737 738 @Override 739 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 740 boolean retry) throws IOException { 741 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); 742 } 743 744 @Override 745 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 746 boolean retry, int replicaId) throws IOException { 747 checkClosed(); 748 if (tableName == null || tableName.getName().length == 0) { 749 throw new IllegalArgumentException("table name cannot be null or zero length"); 750 } 751 if (tableName.equals(TableName.META_TABLE_NAME)) { 752 return locateMeta(tableName, useCache, replicaId); 753 } else { 754 // Region not in the cache - have to go to the meta RS 755 return locateRegionInMeta(tableName, row, useCache, retry, replicaId); 756 } 757 } 758 759 private RegionLocations locateMeta(final TableName tableName, 760 boolean useCache, int replicaId) throws IOException { 761 // HBASE-10785: We cache the location of the META itself, so that we are not overloading 762 // zookeeper with one request for every region lookup. We cache the META with empty row 763 // key in MetaCache. 764 byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta 765 RegionLocations locations = null; 766 if (useCache) { 767 locations = getCachedLocation(tableName, metaCacheKey); 768 if (locations != null && locations.getRegionLocation(replicaId) != null) { 769 return locations; 770 } 771 } 772 773 // only one thread should do the lookup. 774 synchronized (metaRegionLock) { 775 // Check the cache again for a hit in case some other thread made the 776 // same query while we were waiting on the lock. 777 if (useCache) { 778 locations = getCachedLocation(tableName, metaCacheKey); 779 if (locations != null && locations.getRegionLocation(replicaId) != null) { 780 return locations; 781 } 782 } 783 784 // Look up from zookeeper 785 locations = get(this.registry.getMetaRegionLocation()); 786 if (locations != null) { 787 cacheLocation(tableName, locations); 788 } 789 } 790 return locations; 791 } 792 793 /** 794 * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're 795 * seeking. 796 */ 797 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, 798 boolean retry, int replicaId) throws IOException { 799 // If we are supposed to be using the cache, look in the cache to see if we already have the 800 // region. 801 if (useCache) { 802 RegionLocations locations = getCachedLocation(tableName, row); 803 if (locations != null && locations.getRegionLocation(replicaId) != null) { 804 return locations; 805 } 806 } 807 // build the key of the meta region we should be looking for. 808 // the extra 9's on the end are necessary to allow "exact" matches 809 // without knowing the precise region names. 810 byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 811 byte[] metaStopKey = 812 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 813 Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 814 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5) 815 .setReadType(ReadType.PREAD); 816 if (this.useMetaReplicas) { 817 s.setConsistency(Consistency.TIMELINE); 818 } 819 int maxAttempts = (retry ? numTries : 1); 820 for (int tries = 0; ; tries++) { 821 if (tries >= maxAttempts) { 822 throw new NoServerForRegionException("Unable to find region for " 823 + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries."); 824 } 825 if (useCache) { 826 RegionLocations locations = getCachedLocation(tableName, row); 827 if (locations != null && locations.getRegionLocation(replicaId) != null) { 828 return locations; 829 } 830 } else { 831 // If we are not supposed to be using the cache, delete any existing cached location 832 // so it won't interfere. 833 // We are only supposed to clean the cache for the specific replicaId 834 metaCache.clearCache(tableName, row, replicaId); 835 } 836 // Query the meta region 837 long pauseBase = this.pause; 838 userRegionLock.lock(); 839 try { 840 if (useCache) {// re-check cache after get lock 841 RegionLocations locations = getCachedLocation(tableName, row); 842 if (locations != null && locations.getRegionLocation(replicaId) != null) { 843 return locations; 844 } 845 } 846 s.resetMvccReadPoint(); 847 try (ReversedClientScanner rcs = 848 new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, 849 rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) { 850 boolean tableNotFound = true; 851 for (;;) { 852 Result regionInfoRow = rcs.next(); 853 if (regionInfoRow == null) { 854 if (tableNotFound) { 855 throw new TableNotFoundException(tableName); 856 } else { 857 throw new IOException( 858 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 859 } 860 } 861 tableNotFound = false; 862 // convert the row result into the HRegionLocation we need! 863 RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); 864 if (locations == null || locations.getRegionLocation(replicaId) == null) { 865 throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow); 866 } 867 RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion(); 868 if (regionInfo == null) { 869 throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME + 870 ", row=" + regionInfoRow); 871 } 872 // See HBASE-20182. It is possible that we locate to a split parent even after the 873 // children are online, so here we need to skip this region and go to the next one. 874 if (regionInfo.isSplitParent()) { 875 continue; 876 } 877 if (regionInfo.isOffline()) { 878 throw new RegionOfflineException("Region offline; disable table call? " + 879 regionInfo.getRegionNameAsString()); 880 } 881 // It is possible that the split children have not been online yet and we have skipped 882 // the parent in the above condition, so we may have already reached a region which does 883 // not contains us. 884 if (!regionInfo.containsRow(row)) { 885 throw new IOException( 886 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 887 } 888 ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); 889 if (serverName == null) { 890 throw new NoServerForRegionException("No server address listed in " + 891 TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() + 892 " containing row " + Bytes.toStringBinary(row)); 893 } 894 if (isDeadServer(serverName)) { 895 throw new RegionServerStoppedException( 896 "hbase:meta says the region " + regionInfo.getRegionNameAsString() + 897 " is managed by the server " + serverName + ", but it is dead."); 898 } 899 // Instantiate the location 900 cacheLocation(tableName, locations); 901 return locations; 902 } 903 } 904 } catch (TableNotFoundException e) { 905 // if we got this error, probably means the table just plain doesn't 906 // exist. rethrow the error immediately. this should always be coming 907 // from the HTable constructor. 908 throw e; 909 } catch (IOException e) { 910 ExceptionUtil.rethrowIfInterrupt(e); 911 if (e instanceof RemoteException) { 912 e = ((RemoteException)e).unwrapRemoteException(); 913 } 914 if (e instanceof CallQueueTooBigException) { 915 // Give a special check on CallQueueTooBigException, see #HBASE-17114 916 pauseBase = this.pauseForCQTBE; 917 } 918 if (tries < maxAttempts - 1) { 919 LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " + 920 "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e); 921 } else { 922 throw e; 923 } 924 // Only relocate the parent region if necessary 925 if(!(e instanceof RegionOfflineException || 926 e instanceof NoServerForRegionException)) { 927 relocateRegion(TableName.META_TABLE_NAME, metaStartKey, replicaId); 928 } 929 } finally { 930 userRegionLock.unlock(); 931 } 932 try{ 933 Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); 934 } catch (InterruptedException e) { 935 throw new InterruptedIOException("Giving up trying to location region in " + 936 "meta: thread is interrupted."); 937 } 938 } 939 } 940 941 /** 942 * Put a newly discovered HRegionLocation into the cache. 943 * @param tableName The table name. 944 * @param location the new location 945 */ 946 @Override 947 public void cacheLocation(final TableName tableName, final RegionLocations location) { 948 metaCache.cacheLocation(tableName, location); 949 } 950 951 /** 952 * Search the cache for a location that fits our table and row key. 953 * Return null if no suitable region is located. 954 * @return Null or region location found in cache. 955 */ 956 RegionLocations getCachedLocation(final TableName tableName, 957 final byte [] row) { 958 return metaCache.getCachedLocation(tableName, row); 959 } 960 961 public void clearRegionCache(final TableName tableName, byte[] row) { 962 metaCache.clearCache(tableName, row); 963 } 964 965 /* 966 * Delete all cached entries of a table that maps to a specific location. 967 */ 968 @Override 969 public void clearCaches(final ServerName serverName) { 970 metaCache.clearCache(serverName); 971 } 972 973 @Override 974 public void clearRegionCache() { 975 metaCache.clearCache(); 976 } 977 978 @Override 979 public void clearRegionCache(final TableName tableName) { 980 metaCache.clearCache(tableName); 981 } 982 983 /** 984 * Put a newly discovered HRegionLocation into the cache. 985 * @param tableName The table name. 986 * @param source the source of the new location, if it's not coming from meta 987 * @param location the new location 988 */ 989 private void cacheLocation(final TableName tableName, final ServerName source, 990 final HRegionLocation location) { 991 metaCache.cacheLocation(tableName, source, location); 992 } 993 994 // Map keyed by service name + regionserver to service stub implementation 995 private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>(); 996 997 /** 998 * State of the MasterService connection/setup. 999 */ 1000 static class MasterServiceState { 1001 Connection connection; 1002 1003 MasterProtos.MasterService.BlockingInterface stub; 1004 int userCount; 1005 1006 MasterServiceState(final Connection connection) { 1007 super(); 1008 this.connection = connection; 1009 } 1010 1011 @Override 1012 public String toString() { 1013 return "MasterService"; 1014 } 1015 1016 Object getStub() { 1017 return this.stub; 1018 } 1019 1020 void clearStub() { 1021 this.stub = null; 1022 } 1023 1024 boolean isMasterRunning() throws IOException { 1025 MasterProtos.IsMasterRunningResponse response = null; 1026 try { 1027 response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1028 } catch (Exception e) { 1029 throw ProtobufUtil.handleRemoteException(e); 1030 } 1031 return response != null? response.getIsMasterRunning(): false; 1032 } 1033 } 1034 1035 /** 1036 * The record of errors for servers. 1037 */ 1038 static class ServerErrorTracker { 1039 // We need a concurrent map here, as we could have multiple threads updating it in parallel. 1040 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>(); 1041 private final long canRetryUntil; 1042 private final int maxTries;// max number to try 1043 private final long startTrackingTime; 1044 1045 /** 1046 * Constructor 1047 * @param timeout how long to wait before timeout, in unit of millisecond 1048 * @param maxTries how many times to try 1049 */ 1050 public ServerErrorTracker(long timeout, int maxTries) { 1051 this.maxTries = maxTries; 1052 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; 1053 this.startTrackingTime = new Date().getTime(); 1054 } 1055 1056 /** 1057 * We stop to retry when we have exhausted BOTH the number of tries and the time allocated. 1058 * @param numAttempt how many times we have tried by now 1059 */ 1060 boolean canTryMore(int numAttempt) { 1061 // If there is a single try we must not take into account the time. 1062 return numAttempt < maxTries || (maxTries > 1 && 1063 EnvironmentEdgeManager.currentTime() < this.canRetryUntil); 1064 } 1065 1066 /** 1067 * Calculates the back-off time for a retrying request to a particular server. 1068 * 1069 * @param server The server in question. 1070 * @param basePause The default hci pause. 1071 * @return The time to wait before sending next request. 1072 */ 1073 long calculateBackoffTime(ServerName server, long basePause) { 1074 long result; 1075 ServerErrors errorStats = errorsByServer.get(server); 1076 if (errorStats != null) { 1077 result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1)); 1078 } else { 1079 result = 0; // yes, if the server is not in our list we don't wait before retrying. 1080 } 1081 return result; 1082 } 1083 1084 /** 1085 * Reports that there was an error on the server to do whatever bean-counting necessary. 1086 * @param server The server in question. 1087 */ 1088 void reportServerError(ServerName server) { 1089 computeIfAbsent(errorsByServer, server, ServerErrors::new).addError(); 1090 } 1091 1092 long getStartTrackingTime() { 1093 return startTrackingTime; 1094 } 1095 1096 /** 1097 * The record of errors for a server. 1098 */ 1099 private static class ServerErrors { 1100 private final AtomicInteger retries = new AtomicInteger(0); 1101 1102 public int getCount() { 1103 return retries.get(); 1104 } 1105 1106 public void addError() { 1107 retries.incrementAndGet(); 1108 } 1109 } 1110 } 1111 1112 /** 1113 * Class to make a MasterServiceStubMaker stub. 1114 */ 1115 private final class MasterServiceStubMaker { 1116 1117 private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) 1118 throws IOException { 1119 try { 1120 stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1121 } catch (ServiceException e) { 1122 throw ProtobufUtil.handleRemoteException(e); 1123 } 1124 } 1125 1126 /** 1127 * Create a stub. Try once only. It is not typed because there is no common type to protobuf 1128 * services nor their interfaces. Let the caller do appropriate casting. 1129 * @return A stub for master services. 1130 */ 1131 private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() 1132 throws IOException, KeeperException { 1133 ServerName sn = get(registry.getMasterAddress()); 1134 if (sn == null) { 1135 String msg = "ZooKeeper available but no active master location found"; 1136 LOG.info(msg); 1137 throw new MasterNotRunningException(msg); 1138 } 1139 if (isDeadServer(sn)) { 1140 throw new MasterNotRunningException(sn + " is dead."); 1141 } 1142 // Use the security info interface name as our stub key 1143 String key = 1144 getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange); 1145 MasterProtos.MasterService.BlockingInterface stub = 1146 (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1147 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); 1148 return MasterProtos.MasterService.newBlockingStub(channel); 1149 }); 1150 isMasterRunning(stub); 1151 return stub; 1152 } 1153 1154 /** 1155 * Create a stub against the master. Retry if necessary. 1156 * @return A stub to do <code>intf</code> against the master 1157 * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running 1158 */ 1159 MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { 1160 // The lock must be at the beginning to prevent multiple master creations 1161 // (and leaks) in a multithread context 1162 synchronized (masterLock) { 1163 Exception exceptionCaught = null; 1164 if (!closed) { 1165 try { 1166 return makeStubNoRetries(); 1167 } catch (IOException e) { 1168 exceptionCaught = e; 1169 } catch (KeeperException e) { 1170 exceptionCaught = e; 1171 } 1172 throw new MasterNotRunningException(exceptionCaught); 1173 } else { 1174 throw new DoNotRetryIOException("Connection was closed while trying to get master"); 1175 } 1176 } 1177 } 1178 } 1179 1180 @Override 1181 public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException { 1182 return getAdmin(get(registry.getMasterAddress())); 1183 } 1184 1185 @Override 1186 public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName) 1187 throws IOException { 1188 checkClosed(); 1189 if (isDeadServer(serverName)) { 1190 throw new RegionServerStoppedException(serverName + " is dead."); 1191 } 1192 String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, 1193 this.hostnamesCanChange); 1194 return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1195 BlockingRpcChannel channel = 1196 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1197 return AdminProtos.AdminService.newBlockingStub(channel); 1198 }); 1199 } 1200 1201 @Override 1202 public BlockingInterface getClient(ServerName serverName) throws IOException { 1203 checkClosed(); 1204 if (isDeadServer(serverName)) { 1205 throw new RegionServerStoppedException(serverName + " is dead."); 1206 } 1207 String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), 1208 serverName, this.hostnamesCanChange); 1209 return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1210 BlockingRpcChannel channel = 1211 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1212 return ClientProtos.ClientService.newBlockingStub(channel); 1213 }); 1214 } 1215 1216 final MasterServiceState masterServiceState = new MasterServiceState(this); 1217 1218 @Override 1219 public MasterKeepAliveConnection getMaster() throws IOException { 1220 return getKeepAliveMasterService(); 1221 } 1222 1223 private void resetMasterServiceState(final MasterServiceState mss) { 1224 mss.userCount++; 1225 } 1226 1227 private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException { 1228 synchronized (masterLock) { 1229 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) { 1230 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); 1231 this.masterServiceState.stub = stubMaker.makeStub(); 1232 } 1233 resetMasterServiceState(this.masterServiceState); 1234 } 1235 // Ugly delegation just so we can add in a Close method. 1236 final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub; 1237 return new MasterKeepAliveConnection() { 1238 MasterServiceState mss = masterServiceState; 1239 1240 @Override 1241 public MasterProtos.AbortProcedureResponse abortProcedure( 1242 RpcController controller, 1243 MasterProtos.AbortProcedureRequest request) throws ServiceException { 1244 return stub.abortProcedure(controller, request); 1245 } 1246 1247 @Override 1248 public MasterProtos.GetProceduresResponse getProcedures( 1249 RpcController controller, 1250 MasterProtos.GetProceduresRequest request) throws ServiceException { 1251 return stub.getProcedures(controller, request); 1252 } 1253 1254 @Override 1255 public MasterProtos.GetLocksResponse getLocks( 1256 RpcController controller, 1257 MasterProtos.GetLocksRequest request) throws ServiceException { 1258 return stub.getLocks(controller, request); 1259 } 1260 1261 @Override 1262 public MasterProtos.AddColumnResponse addColumn( 1263 RpcController controller, 1264 MasterProtos.AddColumnRequest request) throws ServiceException { 1265 return stub.addColumn(controller, request); 1266 } 1267 1268 @Override 1269 public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller, 1270 MasterProtos.DeleteColumnRequest request) 1271 throws ServiceException { 1272 return stub.deleteColumn(controller, request); 1273 } 1274 1275 @Override 1276 public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller, 1277 MasterProtos.ModifyColumnRequest request) 1278 throws ServiceException { 1279 return stub.modifyColumn(controller, request); 1280 } 1281 1282 @Override 1283 public MasterProtos.MoveRegionResponse moveRegion(RpcController controller, 1284 MasterProtos.MoveRegionRequest request) throws ServiceException { 1285 return stub.moveRegion(controller, request); 1286 } 1287 1288 @Override 1289 public MasterProtos.MergeTableRegionsResponse mergeTableRegions( 1290 RpcController controller, MasterProtos.MergeTableRegionsRequest request) 1291 throws ServiceException { 1292 return stub.mergeTableRegions(controller, request); 1293 } 1294 1295 @Override 1296 public MasterProtos.AssignRegionResponse assignRegion(RpcController controller, 1297 MasterProtos.AssignRegionRequest request) throws ServiceException { 1298 return stub.assignRegion(controller, request); 1299 } 1300 1301 @Override 1302 public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller, 1303 MasterProtos.UnassignRegionRequest request) throws ServiceException { 1304 return stub.unassignRegion(controller, request); 1305 } 1306 1307 @Override 1308 public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller, 1309 MasterProtos.OfflineRegionRequest request) throws ServiceException { 1310 return stub.offlineRegion(controller, request); 1311 } 1312 1313 @Override 1314 public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller, 1315 MasterProtos.SplitTableRegionRequest request) throws ServiceException { 1316 return stub.splitRegion(controller, request); 1317 } 1318 1319 @Override 1320 public MasterProtos.DeleteTableResponse deleteTable(RpcController controller, 1321 MasterProtos.DeleteTableRequest request) throws ServiceException { 1322 return stub.deleteTable(controller, request); 1323 } 1324 1325 @Override 1326 public MasterProtos.TruncateTableResponse truncateTable(RpcController controller, 1327 MasterProtos.TruncateTableRequest request) throws ServiceException { 1328 return stub.truncateTable(controller, request); 1329 } 1330 1331 @Override 1332 public MasterProtos.EnableTableResponse enableTable(RpcController controller, 1333 MasterProtos.EnableTableRequest request) throws ServiceException { 1334 return stub.enableTable(controller, request); 1335 } 1336 1337 @Override 1338 public MasterProtos.DisableTableResponse disableTable(RpcController controller, 1339 MasterProtos.DisableTableRequest request) throws ServiceException { 1340 return stub.disableTable(controller, request); 1341 } 1342 1343 @Override 1344 public MasterProtos.ModifyTableResponse modifyTable(RpcController controller, 1345 MasterProtos.ModifyTableRequest request) throws ServiceException { 1346 return stub.modifyTable(controller, request); 1347 } 1348 1349 @Override 1350 public MasterProtos.CreateTableResponse createTable(RpcController controller, 1351 MasterProtos.CreateTableRequest request) throws ServiceException { 1352 return stub.createTable(controller, request); 1353 } 1354 1355 @Override 1356 public MasterProtos.ShutdownResponse shutdown(RpcController controller, 1357 MasterProtos.ShutdownRequest request) throws ServiceException { 1358 return stub.shutdown(controller, request); 1359 } 1360 1361 @Override 1362 public MasterProtos.StopMasterResponse stopMaster(RpcController controller, 1363 MasterProtos.StopMasterRequest request) throws ServiceException { 1364 return stub.stopMaster(controller, request); 1365 } 1366 1367 @Override 1368 public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode( 1369 final RpcController controller, 1370 final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException { 1371 return stub.isMasterInMaintenanceMode(controller, request); 1372 } 1373 1374 @Override 1375 public MasterProtos.BalanceResponse balance(RpcController controller, 1376 MasterProtos.BalanceRequest request) throws ServiceException { 1377 return stub.balance(controller, request); 1378 } 1379 1380 @Override 1381 public MasterProtos.SetBalancerRunningResponse setBalancerRunning( 1382 RpcController controller, MasterProtos.SetBalancerRunningRequest request) 1383 throws ServiceException { 1384 return stub.setBalancerRunning(controller, request); 1385 } 1386 1387 @Override 1388 public NormalizeResponse normalize(RpcController controller, 1389 NormalizeRequest request) throws ServiceException { 1390 return stub.normalize(controller, request); 1391 } 1392 1393 @Override 1394 public SetNormalizerRunningResponse setNormalizerRunning( 1395 RpcController controller, SetNormalizerRunningRequest request) 1396 throws ServiceException { 1397 return stub.setNormalizerRunning(controller, request); 1398 } 1399 1400 @Override 1401 public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller, 1402 MasterProtos.RunCatalogScanRequest request) throws ServiceException { 1403 return stub.runCatalogScan(controller, request); 1404 } 1405 1406 @Override 1407 public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor( 1408 RpcController controller, MasterProtos.EnableCatalogJanitorRequest request) 1409 throws ServiceException { 1410 return stub.enableCatalogJanitor(controller, request); 1411 } 1412 1413 @Override 1414 public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled( 1415 RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request) 1416 throws ServiceException { 1417 return stub.isCatalogJanitorEnabled(controller, request); 1418 } 1419 1420 @Override 1421 public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller, 1422 MasterProtos.RunCleanerChoreRequest request) 1423 throws ServiceException { 1424 return stub.runCleanerChore(controller, request); 1425 } 1426 1427 @Override 1428 public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning( 1429 RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request) 1430 throws ServiceException { 1431 return stub.setCleanerChoreRunning(controller, request); 1432 } 1433 1434 @Override 1435 public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled( 1436 RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request) 1437 throws ServiceException { 1438 return stub.isCleanerChoreEnabled(controller, request); 1439 } 1440 1441 @Override 1442 public ClientProtos.CoprocessorServiceResponse execMasterService( 1443 RpcController controller, ClientProtos.CoprocessorServiceRequest request) 1444 throws ServiceException { 1445 return stub.execMasterService(controller, request); 1446 } 1447 1448 @Override 1449 public MasterProtos.SnapshotResponse snapshot(RpcController controller, 1450 MasterProtos.SnapshotRequest request) throws ServiceException { 1451 return stub.snapshot(controller, request); 1452 } 1453 1454 @Override 1455 public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots( 1456 RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request) 1457 throws ServiceException { 1458 return stub.getCompletedSnapshots(controller, request); 1459 } 1460 1461 @Override 1462 public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller, 1463 MasterProtos.DeleteSnapshotRequest request) throws ServiceException { 1464 return stub.deleteSnapshot(controller, request); 1465 } 1466 1467 @Override 1468 public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller, 1469 MasterProtos.IsSnapshotDoneRequest request) throws ServiceException { 1470 return stub.isSnapshotDone(controller, request); 1471 } 1472 1473 @Override 1474 public MasterProtos.RestoreSnapshotResponse restoreSnapshot( 1475 RpcController controller, MasterProtos.RestoreSnapshotRequest request) 1476 throws ServiceException { 1477 return stub.restoreSnapshot(controller, request); 1478 } 1479 1480 @Override 1481 public MasterProtos.ExecProcedureResponse execProcedure( 1482 RpcController controller, MasterProtos.ExecProcedureRequest request) 1483 throws ServiceException { 1484 return stub.execProcedure(controller, request); 1485 } 1486 1487 @Override 1488 public MasterProtos.ExecProcedureResponse execProcedureWithRet( 1489 RpcController controller, MasterProtos.ExecProcedureRequest request) 1490 throws ServiceException { 1491 return stub.execProcedureWithRet(controller, request); 1492 } 1493 1494 @Override 1495 public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller, 1496 MasterProtos.IsProcedureDoneRequest request) throws ServiceException { 1497 return stub.isProcedureDone(controller, request); 1498 } 1499 1500 @Override 1501 public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller, 1502 MasterProtos.GetProcedureResultRequest request) throws ServiceException { 1503 return stub.getProcedureResult(controller, request); 1504 } 1505 1506 @Override 1507 public MasterProtos.IsMasterRunningResponse isMasterRunning( 1508 RpcController controller, MasterProtos.IsMasterRunningRequest request) 1509 throws ServiceException { 1510 return stub.isMasterRunning(controller, request); 1511 } 1512 1513 @Override 1514 public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller, 1515 MasterProtos.ModifyNamespaceRequest request) 1516 throws ServiceException { 1517 return stub.modifyNamespace(controller, request); 1518 } 1519 1520 @Override 1521 public MasterProtos.CreateNamespaceResponse createNamespace( 1522 RpcController controller, 1523 MasterProtos.CreateNamespaceRequest request) throws ServiceException { 1524 return stub.createNamespace(controller, request); 1525 } 1526 1527 @Override 1528 public MasterProtos.DeleteNamespaceResponse deleteNamespace( 1529 RpcController controller, 1530 MasterProtos.DeleteNamespaceRequest request) throws ServiceException { 1531 return stub.deleteNamespace(controller, request); 1532 } 1533 1534 @Override 1535 public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor( 1536 RpcController controller, 1537 MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException { 1538 return stub.getNamespaceDescriptor(controller, request); 1539 } 1540 1541 @Override 1542 public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors( 1543 RpcController controller, 1544 MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException { 1545 return stub.listNamespaceDescriptors(controller, request); 1546 } 1547 1548 @Override 1549 public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace( 1550 RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request) 1551 throws ServiceException { 1552 return stub.listTableDescriptorsByNamespace(controller, request); 1553 } 1554 1555 @Override 1556 public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace( 1557 RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request) 1558 throws ServiceException { 1559 return stub.listTableNamesByNamespace(controller, request); 1560 } 1561 1562 @Override 1563 public MasterProtos.GetTableStateResponse getTableState( 1564 RpcController controller, MasterProtos.GetTableStateRequest request) 1565 throws ServiceException { 1566 return stub.getTableState(controller, request); 1567 } 1568 1569 @Override 1570 public void close() { 1571 release(this.mss); 1572 } 1573 1574 @Override 1575 public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus( 1576 RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request) 1577 throws ServiceException { 1578 return stub.getSchemaAlterStatus(controller, request); 1579 } 1580 1581 @Override 1582 public MasterProtos.GetTableDescriptorsResponse getTableDescriptors( 1583 RpcController controller, MasterProtos.GetTableDescriptorsRequest request) 1584 throws ServiceException { 1585 return stub.getTableDescriptors(controller, request); 1586 } 1587 1588 @Override 1589 public MasterProtos.GetTableNamesResponse getTableNames( 1590 RpcController controller, MasterProtos.GetTableNamesRequest request) 1591 throws ServiceException { 1592 return stub.getTableNames(controller, request); 1593 } 1594 1595 @Override 1596 public MasterProtos.GetClusterStatusResponse getClusterStatus( 1597 RpcController controller, MasterProtos.GetClusterStatusRequest request) 1598 throws ServiceException { 1599 return stub.getClusterStatus(controller, request); 1600 } 1601 1602 @Override 1603 public MasterProtos.SetQuotaResponse setQuota( 1604 RpcController controller, MasterProtos.SetQuotaRequest request) 1605 throws ServiceException { 1606 return stub.setQuota(controller, request); 1607 } 1608 1609 @Override 1610 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp( 1611 RpcController controller, MasterProtos.MajorCompactionTimestampRequest request) 1612 throws ServiceException { 1613 return stub.getLastMajorCompactionTimestamp(controller, request); 1614 } 1615 1616 @Override 1617 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion( 1618 RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request) 1619 throws ServiceException { 1620 return stub.getLastMajorCompactionTimestampForRegion(controller, request); 1621 } 1622 1623 @Override 1624 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, 1625 IsBalancerEnabledRequest request) throws ServiceException { 1626 return stub.isBalancerEnabled(controller, request); 1627 } 1628 1629 @Override 1630 public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled( 1631 RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request) 1632 throws ServiceException { 1633 return stub.setSplitOrMergeEnabled(controller, request); 1634 } 1635 1636 @Override 1637 public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled( 1638 RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request) 1639 throws ServiceException { 1640 return stub.isSplitOrMergeEnabled(controller, request); 1641 } 1642 1643 @Override 1644 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller, 1645 IsNormalizerEnabledRequest request) throws ServiceException { 1646 return stub.isNormalizerEnabled(controller, request); 1647 } 1648 1649 @Override 1650 public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller, 1651 SecurityCapabilitiesRequest request) throws ServiceException { 1652 return stub.getSecurityCapabilities(controller, request); 1653 } 1654 1655 @Override 1656 public AddReplicationPeerResponse addReplicationPeer(RpcController controller, 1657 AddReplicationPeerRequest request) throws ServiceException { 1658 return stub.addReplicationPeer(controller, request); 1659 } 1660 1661 @Override 1662 public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller, 1663 RemoveReplicationPeerRequest request) throws ServiceException { 1664 return stub.removeReplicationPeer(controller, request); 1665 } 1666 1667 @Override 1668 public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller, 1669 EnableReplicationPeerRequest request) throws ServiceException { 1670 return stub.enableReplicationPeer(controller, request); 1671 } 1672 1673 @Override 1674 public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller, 1675 DisableReplicationPeerRequest request) throws ServiceException { 1676 return stub.disableReplicationPeer(controller, request); 1677 } 1678 1679 @Override 1680 public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller, 1681 ListDecommissionedRegionServersRequest request) throws ServiceException { 1682 return stub.listDecommissionedRegionServers(controller, request); 1683 } 1684 1685 @Override 1686 public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller, 1687 DecommissionRegionServersRequest request) throws ServiceException { 1688 return stub.decommissionRegionServers(controller, request); 1689 } 1690 1691 @Override 1692 public RecommissionRegionServerResponse recommissionRegionServer( 1693 RpcController controller, RecommissionRegionServerRequest request) 1694 throws ServiceException { 1695 return stub.recommissionRegionServer(controller, request); 1696 } 1697 1698 @Override 1699 public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller, 1700 GetReplicationPeerConfigRequest request) throws ServiceException { 1701 return stub.getReplicationPeerConfig(controller, request); 1702 } 1703 1704 @Override 1705 public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig( 1706 RpcController controller, UpdateReplicationPeerConfigRequest request) 1707 throws ServiceException { 1708 return stub.updateReplicationPeerConfig(controller, request); 1709 } 1710 1711 @Override 1712 public ListReplicationPeersResponse listReplicationPeers(RpcController controller, 1713 ListReplicationPeersRequest request) throws ServiceException { 1714 return stub.listReplicationPeers(controller, request); 1715 } 1716 1717 @Override 1718 public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes( 1719 RpcController controller, GetSpaceQuotaRegionSizesRequest request) 1720 throws ServiceException { 1721 return stub.getSpaceQuotaRegionSizes(controller, request); 1722 } 1723 1724 @Override 1725 public GetQuotaStatesResponse getQuotaStates( 1726 RpcController controller, GetQuotaStatesRequest request) throws ServiceException { 1727 return stub.getQuotaStates(controller, request); 1728 } 1729 1730 @Override 1731 public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller, 1732 MasterProtos.ClearDeadServersRequest request) throws ServiceException { 1733 return stub.clearDeadServers(controller, request); 1734 } 1735 }; 1736 } 1737 1738 private static void release(MasterServiceState mss) { 1739 if (mss != null && mss.connection != null) { 1740 ((ConnectionImplementation)mss.connection).releaseMaster(mss); 1741 } 1742 } 1743 1744 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) { 1745 if (mss.getStub() == null){ 1746 return false; 1747 } 1748 try { 1749 return mss.isMasterRunning(); 1750 } catch (UndeclaredThrowableException e) { 1751 // It's somehow messy, but we can receive exceptions such as 1752 // java.net.ConnectException but they're not declared. So we catch it... 1753 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); 1754 return false; 1755 } catch (IOException se) { 1756 LOG.warn("Checking master connection", se); 1757 return false; 1758 } 1759 } 1760 1761 void releaseMaster(MasterServiceState mss) { 1762 if (mss.getStub() == null) { 1763 return; 1764 } 1765 synchronized (masterLock) { 1766 --mss.userCount; 1767 } 1768 } 1769 1770 private void closeMasterService(MasterServiceState mss) { 1771 if (mss.getStub() != null) { 1772 LOG.info("Closing master protocol: " + mss); 1773 mss.clearStub(); 1774 } 1775 mss.userCount = 0; 1776 } 1777 1778 /** 1779 * Immediate close of the shared master. Can be by the delayed close or when closing the 1780 * connection itself. 1781 */ 1782 private void closeMaster() { 1783 synchronized (masterLock) { 1784 closeMasterService(masterServiceState); 1785 } 1786 } 1787 1788 void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) { 1789 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum); 1790 cacheLocation(hri.getTable(), source, newHrl); 1791 } 1792 1793 @Override 1794 public void deleteCachedRegionLocation(final HRegionLocation location) { 1795 metaCache.clearCache(location); 1796 } 1797 1798 /** 1799 * Update the location with the new value (if the exception is a RegionMovedException) 1800 * or delete it from the cache. Does nothing if we can be sure from the exception that 1801 * the location is still accurate, or if the cache has already been updated. 1802 * @param exception an object (to simplify user code) on which we will try to find a nested 1803 * or wrapped or both RegionMovedException 1804 * @param source server that is the source of the location update. 1805 */ 1806 @Override 1807 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey, 1808 final Object exception, final ServerName source) { 1809 if (rowkey == null || tableName == null) { 1810 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) + 1811 ", tableName=" + (tableName == null ? "null" : tableName)); 1812 return; 1813 } 1814 1815 if (source == null) { 1816 // This should not happen, but let's secure ourselves. 1817 return; 1818 } 1819 1820 if (regionName == null) { 1821 // we do not know which region, so just remove the cache entry for the row and server 1822 if (metrics != null) { 1823 metrics.incrCacheDroppingExceptions(exception); 1824 } 1825 metaCache.clearCache(tableName, rowkey, source); 1826 return; 1827 } 1828 1829 // Is it something we have already updated? 1830 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey); 1831 HRegionLocation oldLocation = null; 1832 if (oldLocations != null) { 1833 oldLocation = oldLocations.getRegionLocationByRegionName(regionName); 1834 } 1835 if (oldLocation == null || !source.equals(oldLocation.getServerName())) { 1836 // There is no such location in the cache (it's been removed already) or 1837 // the cache has already been refreshed with a different location. => nothing to do 1838 return; 1839 } 1840 1841 RegionInfo regionInfo = oldLocation.getRegion(); 1842 Throwable cause = ClientExceptionsUtil.findException(exception); 1843 if (cause != null) { 1844 if (!ClientExceptionsUtil.isMetaClearingException(cause)) { 1845 // We know that the region is still on this region server 1846 return; 1847 } 1848 1849 if (cause instanceof RegionMovedException) { 1850 RegionMovedException rme = (RegionMovedException) cause; 1851 if (LOG.isTraceEnabled()) { 1852 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " + 1853 rme.getHostname() + ":" + rme.getPort() + 1854 " according to " + source.getAddress()); 1855 } 1856 // We know that the region is not anymore on this region server, but we know 1857 // the new location. 1858 updateCachedLocation( 1859 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum()); 1860 return; 1861 } 1862 } 1863 1864 if (metrics != null) { 1865 metrics.incrCacheDroppingExceptions(exception); 1866 } 1867 1868 // If we're here, it means that can cannot be sure about the location, so we remove it from 1869 // the cache. Do not send the source because source can be a new server in the same host:port 1870 metaCache.clearCache(regionInfo); 1871 } 1872 1873 @Override 1874 public AsyncProcess getAsyncProcess() { 1875 return asyncProcess; 1876 } 1877 1878 @Override 1879 public ServerStatisticTracker getStatisticsTracker() { 1880 return this.stats; 1881 } 1882 1883 @Override 1884 public ClientBackoffPolicy getBackoffPolicy() { 1885 return this.backoffPolicy; 1886 } 1887 1888 /* 1889 * Return the number of cached region for a table. It will only be called 1890 * from a unit test. 1891 */ 1892 @VisibleForTesting 1893 int getNumberOfCachedRegionLocations(final TableName tableName) { 1894 return metaCache.getNumberOfCachedRegionLocations(tableName); 1895 } 1896 1897 @Override 1898 public void abort(final String msg, Throwable t) { 1899 if (t != null) { 1900 LOG.error(HBaseMarkers.FATAL, msg, t); 1901 } else { 1902 LOG.error(HBaseMarkers.FATAL, msg); 1903 } 1904 this.aborted = true; 1905 close(); 1906 this.closed = true; 1907 } 1908 1909 @Override 1910 public boolean isClosed() { 1911 return this.closed; 1912 } 1913 1914 @Override 1915 public boolean isAborted(){ 1916 return this.aborted; 1917 } 1918 1919 @Override 1920 public int getCurrentNrHRS() throws IOException { 1921 return get(this.registry.getCurrentNrHRS()); 1922 } 1923 1924 @Override 1925 public void close() { 1926 if (this.closed) { 1927 return; 1928 } 1929 closeMaster(); 1930 shutdownPools(); 1931 if (this.metrics != null) { 1932 this.metrics.shutdown(); 1933 } 1934 this.closed = true; 1935 registry.close(); 1936 this.stubs.clear(); 1937 if (clusterStatusListener != null) { 1938 clusterStatusListener.close(); 1939 } 1940 if (rpcClient != null) { 1941 rpcClient.close(); 1942 } 1943 } 1944 1945 /** 1946 * Close the connection for good. On the off chance that someone is unable to close 1947 * the connection, perhaps because it bailed out prematurely, the method 1948 * below will ensure that this instance is cleaned up. 1949 * Caveat: The JVM may take an unknown amount of time to call finalize on an 1950 * unreachable object, so our hope is that every consumer cleans up after 1951 * itself, like any good citizen. 1952 */ 1953 @Override 1954 protected void finalize() throws Throwable { 1955 super.finalize(); 1956 close(); 1957 } 1958 1959 @Override 1960 public NonceGenerator getNonceGenerator() { 1961 return nonceGenerator; 1962 } 1963 1964 @Override 1965 public TableState getTableState(TableName tableName) throws IOException { 1966 checkClosed(); 1967 TableState tableState = MetaTableAccessor.getTableState(this, tableName); 1968 if (tableState == null) { 1969 throw new TableNotFoundException(tableName); 1970 } 1971 return tableState; 1972 } 1973 1974 @Override 1975 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { 1976 return RpcRetryingCallerFactory 1977 .instantiate(conf, this.interceptor, this.getStatisticsTracker()); 1978 } 1979 1980 @Override 1981 public boolean hasCellBlockSupport() { 1982 return this.rpcClient.hasCellBlockSupport(); 1983 } 1984 1985 @Override 1986 public ConnectionConfiguration getConnectionConfiguration() { 1987 return this.connectionConfig; 1988 } 1989 1990 @Override 1991 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { 1992 return this.rpcCallerFactory; 1993 } 1994 1995 @Override 1996 public RpcControllerFactory getRpcControllerFactory() { 1997 return this.rpcControllerFactory; 1998 } 1999 2000 private static <T> T get(CompletableFuture<T> future) throws IOException { 2001 try { 2002 return future.get(); 2003 } catch (InterruptedException e) { 2004 Thread.currentThread().interrupt(); 2005 throw (IOException) new InterruptedIOException().initCause(e); 2006 } catch (ExecutionException e) { 2007 Throwable cause = e.getCause(); 2008 Throwables.propagateIfPossible(cause, IOException.class); 2009 throw new IOException(cause); 2010 } 2011 } 2012}