001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 024import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 025import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; 026import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsentEx; 027 028import edu.umd.cs.findbugs.annotations.Nullable; 029import java.io.Closeable; 030import java.io.IOException; 031import java.io.InterruptedIOException; 032import java.lang.reflect.UndeclaredThrowableException; 033import java.util.ArrayList; 034import java.util.Collections; 035import java.util.Date; 036import java.util.List; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.LinkedBlockingQueue; 044import java.util.concurrent.ThreadPoolExecutor; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.locks.ReentrantLock; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.CallQueueTooBigException; 050import org.apache.hadoop.hbase.DoNotRetryIOException; 051import org.apache.hadoop.hbase.HConstants; 052import org.apache.hadoop.hbase.HRegionLocation; 053import org.apache.hadoop.hbase.MasterNotRunningException; 054import org.apache.hadoop.hbase.MetaTableAccessor; 055import org.apache.hadoop.hbase.RegionLocations; 056import org.apache.hadoop.hbase.ServerName; 057import org.apache.hadoop.hbase.TableName; 058import org.apache.hadoop.hbase.TableNotEnabledException; 059import org.apache.hadoop.hbase.TableNotFoundException; 060import org.apache.hadoop.hbase.ZooKeeperConnectionException; 061import org.apache.hadoop.hbase.client.Scan.ReadType; 062import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 063import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 064import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 065import org.apache.hadoop.hbase.exceptions.RegionMovedException; 066import org.apache.hadoop.hbase.ipc.RpcClient; 067import org.apache.hadoop.hbase.ipc.RpcClientFactory; 068import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 069import org.apache.hadoop.hbase.log.HBaseMarkers; 070import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 071import org.apache.hadoop.hbase.security.User; 072import org.apache.hadoop.hbase.util.Bytes; 073import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 074import org.apache.hadoop.hbase.util.ExceptionUtil; 075import org.apache.hadoop.hbase.util.Pair; 076import org.apache.hadoop.hbase.util.ReflectionUtils; 077import org.apache.hadoop.hbase.util.Threads; 078import org.apache.hadoop.ipc.RemoteException; 079import org.apache.yetus.audience.InterfaceAudience; 080import org.apache.zookeeper.KeeperException; 081import org.slf4j.Logger; 082import org.slf4j.LoggerFactory; 083 084import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 085import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 086import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 087import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 088import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 089import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 090import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 129 130/** 131 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. 132 * Encapsulates connection to zookeeper and regionservers. 133 */ 134@edu.umd.cs.findbugs.annotations.SuppressWarnings( 135 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", 136 justification="Access to the conncurrent hash map is under a lock so should be fine.") 137@InterfaceAudience.Private 138class ConnectionImplementation implements ClusterConnection, Closeable { 139 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; 140 private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); 141 142 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; 143 144 private final boolean hostnamesCanChange; 145 private final long pause; 146 private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified 147 private boolean useMetaReplicas; 148 private final int metaReplicaCallTimeoutScanInMicroSecond; 149 private final int numTries; 150 final int rpcTimeout; 151 152 /** 153 * Global nonceGenerator shared per client.Currently there's no reason to limit its scope. 154 * Once it's set under nonceGeneratorCreateLock, it is never unset or changed. 155 */ 156 private static volatile NonceGenerator nonceGenerator = null; 157 /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */ 158 private static final Object nonceGeneratorCreateLock = new Object(); 159 160 private final AsyncProcess asyncProcess; 161 // single tracker per connection 162 private final ServerStatisticTracker stats; 163 164 private volatile boolean closed; 165 private volatile boolean aborted; 166 167 // package protected for the tests 168 ClusterStatusListener clusterStatusListener; 169 170 private final Object metaRegionLock = new Object(); 171 172 private final Object masterLock = new Object(); 173 174 // thread executor shared by all Table instances created 175 // by this connection 176 private volatile ExecutorService batchPool = null; 177 // meta thread executor shared by all Table instances created 178 // by this connection 179 private volatile ExecutorService metaLookupPool = null; 180 private volatile boolean cleanupPool = false; 181 182 private final Configuration conf; 183 184 // cache the configuration value for tables so that we can avoid calling 185 // the expensive Configuration to fetch the value multiple times. 186 private final ConnectionConfiguration connectionConfig; 187 188 // Client rpc instance. 189 private final RpcClient rpcClient; 190 191 private final MetaCache metaCache; 192 private final MetricsConnection metrics; 193 194 protected User user; 195 196 private final RpcRetryingCallerFactory rpcCallerFactory; 197 198 private final RpcControllerFactory rpcControllerFactory; 199 200 private final RetryingCallerInterceptor interceptor; 201 202 /** 203 * Cluster registry of basic info such as clusterid and meta region location. 204 */ 205 private final AsyncRegistry registry; 206 207 private final ClientBackoffPolicy backoffPolicy; 208 209 /** 210 * Allow setting an alternate BufferedMutator implementation via 211 * config. If null, use default. 212 */ 213 private final String alternateBufferedMutatorClassName; 214 215 /** lock guards against multiple threads trying to query the meta region at the same time */ 216 private final ReentrantLock userRegionLock = new ReentrantLock(); 217 218 /** 219 * constructor 220 * @param conf Configuration object 221 */ 222 ConnectionImplementation(Configuration conf, 223 ExecutorService pool, User user) throws IOException { 224 this.conf = conf; 225 this.user = user; 226 this.batchPool = pool; 227 this.connectionConfig = new ConnectionConfiguration(conf); 228 this.closed = false; 229 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 230 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 231 long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); 232 if (configuredPauseForCQTBE < pause) { 233 LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " 234 + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE 235 + ", will use " + pause + " instead."); 236 this.pauseForCQTBE = pause; 237 } else { 238 this.pauseForCQTBE = configuredPauseForCQTBE; 239 } 240 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, 241 HConstants.DEFAULT_USE_META_REPLICAS); 242 this.metaReplicaCallTimeoutScanInMicroSecond = 243 connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan(); 244 245 // how many times to try, one more than max *retry* time 246 this.numTries = retries2Attempts(connectionConfig.getRetriesNumber()); 247 this.rpcTimeout = conf.getInt( 248 HConstants.HBASE_RPC_TIMEOUT_KEY, 249 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 250 if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) { 251 synchronized (nonceGeneratorCreateLock) { 252 if (nonceGenerator == null) { 253 nonceGenerator = PerClientRandomNonceGenerator.get(); 254 } 255 } 256 } else { 257 nonceGenerator = NO_NONCE_GENERATOR; 258 } 259 260 this.stats = ServerStatisticTracker.create(conf); 261 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); 262 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 263 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); 264 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 265 this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); 266 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 267 this.metrics = new MetricsConnection(this); 268 } else { 269 this.metrics = null; 270 } 271 this.metaCache = new MetaCache(this.metrics); 272 273 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, 274 HConstants.STATUS_PUBLISHED_DEFAULT); 275 this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); 276 Class<? extends ClusterStatusListener.Listener> listenerClass = 277 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, 278 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, 279 ClusterStatusListener.Listener.class); 280 281 // Is there an alternate BufferedMutator to use? 282 this.alternateBufferedMutatorClassName = 283 this.conf.get(BufferedMutator.CLASSNAME_KEY); 284 285 try { 286 this.registry = AsyncRegistryFactory.getRegistry(conf); 287 retrieveClusterId(); 288 289 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); 290 291 // Do we publish the status? 292 if (shouldListen) { 293 if (listenerClass == null) { 294 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + 295 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); 296 } else { 297 clusterStatusListener = new ClusterStatusListener( 298 new ClusterStatusListener.DeadServerHandler() { 299 @Override 300 public void newDead(ServerName sn) { 301 clearCaches(sn); 302 rpcClient.cancelConnections(sn); 303 } 304 }, conf, listenerClass); 305 } 306 } 307 } catch (Throwable e) { 308 // avoid leaks: registry, rpcClient, ... 309 LOG.debug("connection construction failed", e); 310 close(); 311 throw e; 312 } 313 } 314 315 /** 316 * @param useMetaReplicas 317 */ 318 @VisibleForTesting 319 void setUseMetaReplicas(final boolean useMetaReplicas) { 320 this.useMetaReplicas = useMetaReplicas; 321 } 322 323 /** 324 * @param conn The connection for which to replace the generator. 325 * @param cnm Replaces the nonce generator used, for testing. 326 * @return old nonce generator. 327 */ 328 @VisibleForTesting 329 static NonceGenerator injectNonceGeneratorForTesting( 330 ClusterConnection conn, NonceGenerator cnm) { 331 ConnectionImplementation connImpl = (ConnectionImplementation)conn; 332 NonceGenerator ng = connImpl.getNonceGenerator(); 333 LOG.warn("Nonce generator is being replaced by test code for " 334 + cnm.getClass().getName()); 335 nonceGenerator = cnm; 336 return ng; 337 } 338 339 @Override 340 public Table getTable(TableName tableName) throws IOException { 341 return getTable(tableName, getBatchPool()); 342 } 343 344 @Override 345 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 346 return new TableBuilderBase(tableName, connectionConfig) { 347 348 @Override 349 public Table build() { 350 return new HTable(ConnectionImplementation.this, this, rpcCallerFactory, 351 rpcControllerFactory, pool); 352 } 353 }; 354 } 355 356 @Override 357 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { 358 if (params.getTableName() == null) { 359 throw new IllegalArgumentException("TableName cannot be null."); 360 } 361 if (params.getPool() == null) { 362 params.pool(HTable.getDefaultExecutor(getConfiguration())); 363 } 364 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { 365 params.writeBufferSize(connectionConfig.getWriteBufferSize()); 366 } 367 if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) { 368 params.setWriteBufferPeriodicFlushTimeoutMs( 369 connectionConfig.getWriteBufferPeriodicFlushTimeoutMs()); 370 } 371 if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) { 372 params.setWriteBufferPeriodicFlushTimerTickMs( 373 connectionConfig.getWriteBufferPeriodicFlushTimerTickMs()); 374 } 375 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { 376 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); 377 } 378 // Look to see if an alternate BufferedMutation implementation is wanted. 379 // Look in params and in config. If null, use default. 380 String implementationClassName = params.getImplementationClassName(); 381 if (implementationClassName == null) { 382 implementationClassName = this.alternateBufferedMutatorClassName; 383 } 384 if (implementationClassName == null) { 385 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); 386 } 387 try { 388 return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName), 389 this, rpcCallerFactory, rpcControllerFactory, params); 390 } catch (ClassNotFoundException e) { 391 throw new RuntimeException(e); 392 } 393 } 394 395 @Override 396 public BufferedMutator getBufferedMutator(TableName tableName) { 397 return getBufferedMutator(new BufferedMutatorParams(tableName)); 398 } 399 400 @Override 401 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 402 return new HRegionLocator(tableName, this); 403 } 404 405 @Override 406 public Admin getAdmin() throws IOException { 407 return new HBaseAdmin(this); 408 } 409 410 @Override 411 public Hbck getHbck() throws IOException { 412 return getHbck(get(registry.getMasterAddress())); 413 } 414 415 @Override 416 public Hbck getHbck(ServerName masterServer) throws IOException { 417 checkClosed(); 418 if (isDeadServer(masterServer)) { 419 throw new RegionServerStoppedException(masterServer + " is dead."); 420 } 421 String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), 422 masterServer, this.hostnamesCanChange); 423 424 return new HBaseHbck( 425 (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 426 BlockingRpcChannel channel = 427 this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); 428 return MasterProtos.HbckService.newBlockingStub(channel); 429 }), rpcControllerFactory); 430 } 431 432 @Override 433 public MetricsConnection getConnectionMetrics() { 434 return this.metrics; 435 } 436 437 private ExecutorService getBatchPool() { 438 if (batchPool == null) { 439 synchronized (this) { 440 if (batchPool == null) { 441 int threads = conf.getInt("hbase.hconnection.threads.max", 256); 442 this.batchPool = getThreadPool(threads, threads, "-shared", null); 443 this.cleanupPool = true; 444 } 445 } 446 } 447 return this.batchPool; 448 } 449 450 private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint, 451 BlockingQueue<Runnable> passedWorkQueue) { 452 // shared HTable thread executor not yet initialized 453 if (maxThreads == 0) { 454 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 455 } 456 if (coreThreads == 0) { 457 coreThreads = Runtime.getRuntime().availableProcessors() * 8; 458 } 459 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); 460 BlockingQueue<Runnable> workQueue = passedWorkQueue; 461 if (workQueue == null) { 462 workQueue = 463 new LinkedBlockingQueue<>(maxThreads * 464 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 465 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 466 coreThreads = maxThreads; 467 } 468 ThreadPoolExecutor tpe = new ThreadPoolExecutor( 469 coreThreads, 470 maxThreads, 471 keepAliveTime, 472 TimeUnit.SECONDS, 473 workQueue, 474 Threads.newDaemonThreadFactory(toString() + nameHint)); 475 tpe.allowCoreThreadTimeOut(true); 476 return tpe; 477 } 478 479 private ExecutorService getMetaLookupPool() { 480 if (this.metaLookupPool == null) { 481 synchronized (this) { 482 if (this.metaLookupPool == null) { 483 //Some of the threads would be used for meta replicas 484 //To start with, threads.max.core threads can hit the meta (including replicas). 485 //After that, requests will get queued up in the passed queue, and only after 486 //the queue is full, a new thread will be started 487 int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128); 488 this.metaLookupPool = getThreadPool( 489 threads, 490 threads, 491 "-metaLookup-shared-", new LinkedBlockingQueue<>()); 492 } 493 } 494 } 495 return this.metaLookupPool; 496 } 497 498 protected ExecutorService getCurrentMetaLookupPool() { 499 return metaLookupPool; 500 } 501 502 protected ExecutorService getCurrentBatchPool() { 503 return batchPool; 504 } 505 506 private void shutdownPools() { 507 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { 508 shutdownBatchPool(this.batchPool); 509 } 510 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { 511 shutdownBatchPool(this.metaLookupPool); 512 } 513 } 514 515 private void shutdownBatchPool(ExecutorService pool) { 516 pool.shutdown(); 517 try { 518 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 519 pool.shutdownNow(); 520 } 521 } catch (InterruptedException e) { 522 pool.shutdownNow(); 523 } 524 } 525 526 /** 527 * For tests only. 528 */ 529 @VisibleForTesting 530 RpcClient getRpcClient() { 531 return rpcClient; 532 } 533 534 /** 535 * An identifier that will remain the same for a given connection. 536 */ 537 @Override 538 public String toString(){ 539 return "hconnection-0x" + Integer.toHexString(hashCode()); 540 } 541 542 protected String clusterId = null; 543 544 protected void retrieveClusterId() { 545 if (clusterId != null) { 546 return; 547 } 548 try { 549 this.clusterId = this.registry.getClusterId().get(); 550 } catch (InterruptedException | ExecutionException e) { 551 LOG.warn("Retrieve cluster id failed", e); 552 } 553 if (clusterId == null) { 554 clusterId = HConstants.CLUSTER_ID_DEFAULT; 555 LOG.debug("clusterid came back null, using default " + clusterId); 556 } 557 } 558 559 @Override 560 public Configuration getConfiguration() { 561 return this.conf; 562 } 563 564 private void checkClosed() throws DoNotRetryIOException { 565 if (this.closed) { 566 throw new DoNotRetryIOException(toString() + " closed"); 567 } 568 } 569 570 /** 571 * @return true if the master is running, throws an exception otherwise 572 * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running 573 * @deprecated this has been deprecated without a replacement 574 */ 575 @Deprecated 576 @Override 577 public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { 578 // When getting the master connection, we check it's running, 579 // so if there is no exception, it means we've been able to get a 580 // connection on a running master 581 MasterKeepAliveConnection m; 582 try { 583 m = getKeepAliveMasterService(); 584 } catch (IOException e) { 585 throw new MasterNotRunningException(e); 586 } 587 m.close(); 588 return true; 589 } 590 591 @Override 592 public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, 593 boolean reload) throws IOException { 594 return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row); 595 } 596 597 598 @Override 599 public boolean isTableEnabled(TableName tableName) throws IOException { 600 return getTableState(tableName).inStates(TableState.State.ENABLED); 601 } 602 603 @Override 604 public boolean isTableDisabled(TableName tableName) throws IOException { 605 return getTableState(tableName).inStates(TableState.State.DISABLED); 606 } 607 608 @Override 609 public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) 610 throws IOException { 611 checkClosed(); 612 try { 613 if (!isTableEnabled(tableName)) { 614 LOG.debug("Table {} not enabled", tableName); 615 return false; 616 } 617 if (TableName.isMetaTableName(tableName)) { 618 // meta table is always available 619 return true; 620 } 621 List<Pair<RegionInfo, ServerName>> locations = 622 MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true); 623 624 int notDeployed = 0; 625 int regionCount = 0; 626 for (Pair<RegionInfo, ServerName> pair : locations) { 627 RegionInfo info = pair.getFirst(); 628 if (pair.getSecond() == null) { 629 LOG.debug("Table {} has not deployed region {}", tableName, 630 pair.getFirst().getEncodedName()); 631 notDeployed++; 632 } else if (splitKeys != null 633 && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 634 for (byte[] splitKey : splitKeys) { 635 // Just check if the splitkey is available 636 if (Bytes.equals(info.getStartKey(), splitKey)) { 637 regionCount++; 638 break; 639 } 640 } 641 } else { 642 // Always empty start row should be counted 643 regionCount++; 644 } 645 } 646 if (notDeployed > 0) { 647 if (LOG.isDebugEnabled()) { 648 LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed); 649 } 650 return false; 651 } else if (splitKeys != null && regionCount != splitKeys.length + 1) { 652 if (LOG.isDebugEnabled()) { 653 LOG.debug("Table {} expected to have {} regions, but only {} available", tableName, 654 splitKeys.length + 1, regionCount); 655 } 656 return false; 657 } else { 658 LOG.trace("Table {} should be available", tableName); 659 return true; 660 } 661 } catch (TableNotFoundException tnfe) { 662 LOG.warn("Table {} does not exist", tableName); 663 return false; 664 } 665 } 666 667 @Override 668 public HRegionLocation locateRegion(final byte[] regionName) throws IOException { 669 RegionLocations locations = locateRegion(RegionInfo.getTable(regionName), 670 RegionInfo.getStartKey(regionName), false, true); 671 return locations == null ? null : locations.getRegionLocation(); 672 } 673 674 private boolean isDeadServer(ServerName sn) { 675 if (clusterStatusListener == null) { 676 return false; 677 } else { 678 return clusterStatusListener.isDeadServer(sn); 679 } 680 } 681 682 @Override 683 public List<HRegionLocation> locateRegions(TableName tableName) throws IOException { 684 return locateRegions(tableName, false, true); 685 } 686 687 @Override 688 public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache, 689 boolean offlined) throws IOException { 690 List<RegionInfo> regions; 691 if (TableName.isMetaTableName(tableName)) { 692 regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO); 693 } else { 694 regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined); 695 } 696 List<HRegionLocation> locations = new ArrayList<>(); 697 for (RegionInfo regionInfo : regions) { 698 if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) { 699 continue; 700 } 701 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true); 702 if (list != null) { 703 for (HRegionLocation loc : list.getRegionLocations()) { 704 if (loc != null) { 705 locations.add(loc); 706 } 707 } 708 } 709 } 710 return locations; 711 } 712 713 @Override 714 public HRegionLocation locateRegion(final TableName tableName, final byte[] row) 715 throws IOException { 716 RegionLocations locations = locateRegion(tableName, row, true, true); 717 return locations == null ? null : locations.getRegionLocation(); 718 } 719 720 @Override 721 public HRegionLocation relocateRegion(final TableName tableName, final byte[] row) 722 throws IOException { 723 RegionLocations locations = 724 relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 725 return locations == null ? null 726 : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); 727 } 728 729 @Override 730 public RegionLocations relocateRegion(final TableName tableName, 731 final byte [] row, int replicaId) throws IOException{ 732 // Since this is an explicit request not to use any caching, finding 733 // disabled tables should not be desirable. This will ensure that an exception is thrown when 734 // the first time a disabled table is interacted with. 735 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) { 736 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); 737 } 738 739 return locateRegion(tableName, row, false, true, replicaId); 740 } 741 742 @Override 743 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 744 boolean retry) throws IOException { 745 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); 746 } 747 748 @Override 749 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 750 boolean retry, int replicaId) throws IOException { 751 checkClosed(); 752 if (tableName == null || tableName.getName().length == 0) { 753 throw new IllegalArgumentException("table name cannot be null or zero length"); 754 } 755 if (tableName.equals(TableName.META_TABLE_NAME)) { 756 return locateMeta(tableName, useCache, replicaId); 757 } else { 758 // Region not in the cache - have to go to the meta RS 759 return locateRegionInMeta(tableName, row, useCache, retry, replicaId); 760 } 761 } 762 763 private RegionLocations locateMeta(final TableName tableName, 764 boolean useCache, int replicaId) throws IOException { 765 // HBASE-10785: We cache the location of the META itself, so that we are not overloading 766 // zookeeper with one request for every region lookup. We cache the META with empty row 767 // key in MetaCache. 768 byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta 769 RegionLocations locations = null; 770 if (useCache) { 771 locations = getCachedLocation(tableName, metaCacheKey); 772 if (locations != null && locations.getRegionLocation(replicaId) != null) { 773 return locations; 774 } 775 } 776 777 // only one thread should do the lookup. 778 synchronized (metaRegionLock) { 779 // Check the cache again for a hit in case some other thread made the 780 // same query while we were waiting on the lock. 781 if (useCache) { 782 locations = getCachedLocation(tableName, metaCacheKey); 783 if (locations != null && locations.getRegionLocation(replicaId) != null) { 784 return locations; 785 } 786 } 787 788 // Look up from zookeeper 789 locations = get(this.registry.getMetaRegionLocation()); 790 if (locations != null) { 791 cacheLocation(tableName, locations); 792 } 793 } 794 return locations; 795 } 796 797 /** 798 * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're 799 * seeking. 800 */ 801 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, 802 boolean retry, int replicaId) throws IOException { 803 // If we are supposed to be using the cache, look in the cache to see if we already have the 804 // region. 805 if (useCache) { 806 RegionLocations locations = getCachedLocation(tableName, row); 807 if (locations != null && locations.getRegionLocation(replicaId) != null) { 808 return locations; 809 } 810 } 811 // build the key of the meta region we should be looking for. 812 // the extra 9's on the end are necessary to allow "exact" matches 813 // without knowing the precise region names. 814 byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 815 byte[] metaStopKey = 816 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 817 Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 818 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5) 819 .setReadType(ReadType.PREAD); 820 if (this.useMetaReplicas) { 821 s.setConsistency(Consistency.TIMELINE); 822 } 823 int maxAttempts = (retry ? numTries : 1); 824 boolean relocateMeta = false; 825 for (int tries = 0; ; tries++) { 826 if (tries >= maxAttempts) { 827 throw new NoServerForRegionException("Unable to find region for " 828 + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries."); 829 } 830 if (useCache) { 831 RegionLocations locations = getCachedLocation(tableName, row); 832 if (locations != null && locations.getRegionLocation(replicaId) != null) { 833 return locations; 834 } 835 } else { 836 // If we are not supposed to be using the cache, delete any existing cached location 837 // so it won't interfere. 838 // We are only supposed to clean the cache for the specific replicaId 839 metaCache.clearCache(tableName, row, replicaId); 840 } 841 // Query the meta region 842 long pauseBase = this.pause; 843 userRegionLock.lock(); 844 try { 845 if (useCache) {// re-check cache after get lock 846 RegionLocations locations = getCachedLocation(tableName, row); 847 if (locations != null && locations.getRegionLocation(replicaId) != null) { 848 return locations; 849 } 850 } 851 if (relocateMeta) { 852 relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, 853 RegionInfo.DEFAULT_REPLICA_ID); 854 } 855 s.resetMvccReadPoint(); 856 try (ReversedClientScanner rcs = 857 new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, 858 rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) { 859 boolean tableNotFound = true; 860 for (;;) { 861 Result regionInfoRow = rcs.next(); 862 if (regionInfoRow == null) { 863 if (tableNotFound) { 864 throw new TableNotFoundException(tableName); 865 } else { 866 throw new IOException( 867 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 868 } 869 } 870 tableNotFound = false; 871 // convert the row result into the HRegionLocation we need! 872 RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); 873 if (locations == null || locations.getRegionLocation(replicaId) == null) { 874 throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow); 875 } 876 RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion(); 877 if (regionInfo == null) { 878 throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME + 879 ", row=" + regionInfoRow); 880 } 881 // See HBASE-20182. It is possible that we locate to a split parent even after the 882 // children are online, so here we need to skip this region and go to the next one. 883 if (regionInfo.isSplitParent()) { 884 continue; 885 } 886 if (regionInfo.isOffline()) { 887 throw new RegionOfflineException("Region offline; disable table call? " + 888 regionInfo.getRegionNameAsString()); 889 } 890 // It is possible that the split children have not been online yet and we have skipped 891 // the parent in the above condition, so we may have already reached a region which does 892 // not contains us. 893 if (!regionInfo.containsRow(row)) { 894 throw new IOException( 895 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 896 } 897 ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); 898 if (serverName == null) { 899 throw new NoServerForRegionException("No server address listed in " + 900 TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() + 901 " containing row " + Bytes.toStringBinary(row)); 902 } 903 if (isDeadServer(serverName)) { 904 throw new RegionServerStoppedException( 905 "hbase:meta says the region " + regionInfo.getRegionNameAsString() + 906 " is managed by the server " + serverName + ", but it is dead."); 907 } 908 // Instantiate the location 909 cacheLocation(tableName, locations); 910 return locations; 911 } 912 } 913 } catch (TableNotFoundException e) { 914 // if we got this error, probably means the table just plain doesn't 915 // exist. rethrow the error immediately. this should always be coming 916 // from the HTable constructor. 917 throw e; 918 } catch (IOException e) { 919 ExceptionUtil.rethrowIfInterrupt(e); 920 if (e instanceof RemoteException) { 921 e = ((RemoteException)e).unwrapRemoteException(); 922 } 923 if (e instanceof CallQueueTooBigException) { 924 // Give a special check on CallQueueTooBigException, see #HBASE-17114 925 pauseBase = this.pauseForCQTBE; 926 } 927 if (tries < maxAttempts - 1) { 928 LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " + 929 "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e); 930 } else { 931 throw e; 932 } 933 // Only relocate the parent region if necessary 934 relocateMeta = 935 !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); 936 } finally { 937 userRegionLock.unlock(); 938 } 939 try{ 940 Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); 941 } catch (InterruptedException e) { 942 throw new InterruptedIOException("Giving up trying to location region in " + 943 "meta: thread is interrupted."); 944 } 945 } 946 } 947 948 /** 949 * Put a newly discovered HRegionLocation into the cache. 950 * @param tableName The table name. 951 * @param location the new location 952 */ 953 @Override 954 public void cacheLocation(final TableName tableName, final RegionLocations location) { 955 metaCache.cacheLocation(tableName, location); 956 } 957 958 /** 959 * Search the cache for a location that fits our table and row key. 960 * Return null if no suitable region is located. 961 * @return Null or region location found in cache. 962 */ 963 RegionLocations getCachedLocation(final TableName tableName, 964 final byte [] row) { 965 return metaCache.getCachedLocation(tableName, row); 966 } 967 968 public void clearRegionCache(final TableName tableName, byte[] row) { 969 metaCache.clearCache(tableName, row); 970 } 971 972 /* 973 * Delete all cached entries of a table that maps to a specific location. 974 */ 975 @Override 976 public void clearCaches(final ServerName serverName) { 977 metaCache.clearCache(serverName); 978 } 979 980 @Override 981 public void clearRegionCache() { 982 metaCache.clearCache(); 983 } 984 985 @Override 986 public void clearRegionCache(final TableName tableName) { 987 metaCache.clearCache(tableName); 988 } 989 990 /** 991 * Put a newly discovered HRegionLocation into the cache. 992 * @param tableName The table name. 993 * @param source the source of the new location, if it's not coming from meta 994 * @param location the new location 995 */ 996 private void cacheLocation(final TableName tableName, final ServerName source, 997 final HRegionLocation location) { 998 metaCache.cacheLocation(tableName, source, location); 999 } 1000 1001 // Map keyed by service name + regionserver to service stub implementation 1002 private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>(); 1003 1004 /** 1005 * State of the MasterService connection/setup. 1006 */ 1007 static class MasterServiceState { 1008 Connection connection; 1009 1010 MasterProtos.MasterService.BlockingInterface stub; 1011 int userCount; 1012 1013 MasterServiceState(final Connection connection) { 1014 super(); 1015 this.connection = connection; 1016 } 1017 1018 @Override 1019 public String toString() { 1020 return "MasterService"; 1021 } 1022 1023 Object getStub() { 1024 return this.stub; 1025 } 1026 1027 void clearStub() { 1028 this.stub = null; 1029 } 1030 1031 boolean isMasterRunning() throws IOException { 1032 MasterProtos.IsMasterRunningResponse response = null; 1033 try { 1034 response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1035 } catch (Exception e) { 1036 throw ProtobufUtil.handleRemoteException(e); 1037 } 1038 return response != null? response.getIsMasterRunning(): false; 1039 } 1040 } 1041 1042 /** 1043 * The record of errors for servers. 1044 */ 1045 static class ServerErrorTracker { 1046 // We need a concurrent map here, as we could have multiple threads updating it in parallel. 1047 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>(); 1048 private final long canRetryUntil; 1049 private final int maxTries;// max number to try 1050 private final long startTrackingTime; 1051 1052 /** 1053 * Constructor 1054 * @param timeout how long to wait before timeout, in unit of millisecond 1055 * @param maxTries how many times to try 1056 */ 1057 public ServerErrorTracker(long timeout, int maxTries) { 1058 this.maxTries = maxTries; 1059 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; 1060 this.startTrackingTime = new Date().getTime(); 1061 } 1062 1063 /** 1064 * We stop to retry when we have exhausted BOTH the number of tries and the time allocated. 1065 * @param numAttempt how many times we have tried by now 1066 */ 1067 boolean canTryMore(int numAttempt) { 1068 // If there is a single try we must not take into account the time. 1069 return numAttempt < maxTries || (maxTries > 1 && 1070 EnvironmentEdgeManager.currentTime() < this.canRetryUntil); 1071 } 1072 1073 /** 1074 * Calculates the back-off time for a retrying request to a particular server. 1075 * 1076 * @param server The server in question. 1077 * @param basePause The default hci pause. 1078 * @return The time to wait before sending next request. 1079 */ 1080 long calculateBackoffTime(ServerName server, long basePause) { 1081 long result; 1082 ServerErrors errorStats = errorsByServer.get(server); 1083 if (errorStats != null) { 1084 result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1)); 1085 } else { 1086 result = 0; // yes, if the server is not in our list we don't wait before retrying. 1087 } 1088 return result; 1089 } 1090 1091 /** 1092 * Reports that there was an error on the server to do whatever bean-counting necessary. 1093 * @param server The server in question. 1094 */ 1095 void reportServerError(ServerName server) { 1096 computeIfAbsent(errorsByServer, server, ServerErrors::new).addError(); 1097 } 1098 1099 long getStartTrackingTime() { 1100 return startTrackingTime; 1101 } 1102 1103 /** 1104 * The record of errors for a server. 1105 */ 1106 private static class ServerErrors { 1107 private final AtomicInteger retries = new AtomicInteger(0); 1108 1109 public int getCount() { 1110 return retries.get(); 1111 } 1112 1113 public void addError() { 1114 retries.incrementAndGet(); 1115 } 1116 } 1117 } 1118 1119 /** 1120 * Class to make a MasterServiceStubMaker stub. 1121 */ 1122 private final class MasterServiceStubMaker { 1123 1124 private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) 1125 throws IOException { 1126 try { 1127 stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1128 } catch (ServiceException e) { 1129 throw ProtobufUtil.handleRemoteException(e); 1130 } 1131 } 1132 1133 /** 1134 * Create a stub. Try once only. It is not typed because there is no common type to protobuf 1135 * services nor their interfaces. Let the caller do appropriate casting. 1136 * @return A stub for master services. 1137 */ 1138 private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() 1139 throws IOException, KeeperException { 1140 ServerName sn = get(registry.getMasterAddress()); 1141 if (sn == null) { 1142 String msg = "ZooKeeper available but no active master location found"; 1143 LOG.info(msg); 1144 throw new MasterNotRunningException(msg); 1145 } 1146 if (isDeadServer(sn)) { 1147 throw new MasterNotRunningException(sn + " is dead."); 1148 } 1149 // Use the security info interface name as our stub key 1150 String key = 1151 getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange); 1152 MasterProtos.MasterService.BlockingInterface stub = 1153 (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1154 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); 1155 return MasterProtos.MasterService.newBlockingStub(channel); 1156 }); 1157 isMasterRunning(stub); 1158 return stub; 1159 } 1160 1161 /** 1162 * Create a stub against the master. Retry if necessary. 1163 * @return A stub to do <code>intf</code> against the master 1164 * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running 1165 */ 1166 MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { 1167 // The lock must be at the beginning to prevent multiple master creations 1168 // (and leaks) in a multithread context 1169 synchronized (masterLock) { 1170 Exception exceptionCaught = null; 1171 if (!closed) { 1172 try { 1173 return makeStubNoRetries(); 1174 } catch (IOException e) { 1175 exceptionCaught = e; 1176 } catch (KeeperException e) { 1177 exceptionCaught = e; 1178 } 1179 throw new MasterNotRunningException(exceptionCaught); 1180 } else { 1181 throw new DoNotRetryIOException("Connection was closed while trying to get master"); 1182 } 1183 } 1184 } 1185 } 1186 1187 @Override 1188 public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException { 1189 return getAdmin(get(registry.getMasterAddress())); 1190 } 1191 1192 @Override 1193 public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName) 1194 throws IOException { 1195 checkClosed(); 1196 if (isDeadServer(serverName)) { 1197 throw new RegionServerStoppedException(serverName + " is dead."); 1198 } 1199 String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, 1200 this.hostnamesCanChange); 1201 return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1202 BlockingRpcChannel channel = 1203 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1204 return AdminProtos.AdminService.newBlockingStub(channel); 1205 }); 1206 } 1207 1208 @Override 1209 public BlockingInterface getClient(ServerName serverName) throws IOException { 1210 checkClosed(); 1211 if (isDeadServer(serverName)) { 1212 throw new RegionServerStoppedException(serverName + " is dead."); 1213 } 1214 String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), 1215 serverName, this.hostnamesCanChange); 1216 return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1217 BlockingRpcChannel channel = 1218 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1219 return ClientProtos.ClientService.newBlockingStub(channel); 1220 }); 1221 } 1222 1223 final MasterServiceState masterServiceState = new MasterServiceState(this); 1224 1225 @Override 1226 public MasterKeepAliveConnection getMaster() throws IOException { 1227 return getKeepAliveMasterService(); 1228 } 1229 1230 private void resetMasterServiceState(final MasterServiceState mss) { 1231 mss.userCount++; 1232 } 1233 1234 private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException { 1235 synchronized (masterLock) { 1236 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) { 1237 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); 1238 this.masterServiceState.stub = stubMaker.makeStub(); 1239 } 1240 resetMasterServiceState(this.masterServiceState); 1241 } 1242 // Ugly delegation just so we can add in a Close method. 1243 final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub; 1244 return new MasterKeepAliveConnection() { 1245 MasterServiceState mss = masterServiceState; 1246 1247 @Override 1248 public MasterProtos.AbortProcedureResponse abortProcedure( 1249 RpcController controller, 1250 MasterProtos.AbortProcedureRequest request) throws ServiceException { 1251 return stub.abortProcedure(controller, request); 1252 } 1253 1254 @Override 1255 public MasterProtos.GetProceduresResponse getProcedures( 1256 RpcController controller, 1257 MasterProtos.GetProceduresRequest request) throws ServiceException { 1258 return stub.getProcedures(controller, request); 1259 } 1260 1261 @Override 1262 public MasterProtos.GetLocksResponse getLocks( 1263 RpcController controller, 1264 MasterProtos.GetLocksRequest request) throws ServiceException { 1265 return stub.getLocks(controller, request); 1266 } 1267 1268 @Override 1269 public MasterProtos.AddColumnResponse addColumn( 1270 RpcController controller, 1271 MasterProtos.AddColumnRequest request) throws ServiceException { 1272 return stub.addColumn(controller, request); 1273 } 1274 1275 @Override 1276 public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller, 1277 MasterProtos.DeleteColumnRequest request) 1278 throws ServiceException { 1279 return stub.deleteColumn(controller, request); 1280 } 1281 1282 @Override 1283 public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller, 1284 MasterProtos.ModifyColumnRequest request) 1285 throws ServiceException { 1286 return stub.modifyColumn(controller, request); 1287 } 1288 1289 @Override 1290 public MasterProtos.MoveRegionResponse moveRegion(RpcController controller, 1291 MasterProtos.MoveRegionRequest request) throws ServiceException { 1292 return stub.moveRegion(controller, request); 1293 } 1294 1295 @Override 1296 public MasterProtos.MergeTableRegionsResponse mergeTableRegions( 1297 RpcController controller, MasterProtos.MergeTableRegionsRequest request) 1298 throws ServiceException { 1299 return stub.mergeTableRegions(controller, request); 1300 } 1301 1302 @Override 1303 public MasterProtos.AssignRegionResponse assignRegion(RpcController controller, 1304 MasterProtos.AssignRegionRequest request) throws ServiceException { 1305 return stub.assignRegion(controller, request); 1306 } 1307 1308 @Override 1309 public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller, 1310 MasterProtos.UnassignRegionRequest request) throws ServiceException { 1311 return stub.unassignRegion(controller, request); 1312 } 1313 1314 @Override 1315 public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller, 1316 MasterProtos.OfflineRegionRequest request) throws ServiceException { 1317 return stub.offlineRegion(controller, request); 1318 } 1319 1320 @Override 1321 public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller, 1322 MasterProtos.SplitTableRegionRequest request) throws ServiceException { 1323 return stub.splitRegion(controller, request); 1324 } 1325 1326 @Override 1327 public MasterProtos.DeleteTableResponse deleteTable(RpcController controller, 1328 MasterProtos.DeleteTableRequest request) throws ServiceException { 1329 return stub.deleteTable(controller, request); 1330 } 1331 1332 @Override 1333 public MasterProtos.TruncateTableResponse truncateTable(RpcController controller, 1334 MasterProtos.TruncateTableRequest request) throws ServiceException { 1335 return stub.truncateTable(controller, request); 1336 } 1337 1338 @Override 1339 public MasterProtos.EnableTableResponse enableTable(RpcController controller, 1340 MasterProtos.EnableTableRequest request) throws ServiceException { 1341 return stub.enableTable(controller, request); 1342 } 1343 1344 @Override 1345 public MasterProtos.DisableTableResponse disableTable(RpcController controller, 1346 MasterProtos.DisableTableRequest request) throws ServiceException { 1347 return stub.disableTable(controller, request); 1348 } 1349 1350 @Override 1351 public MasterProtos.ModifyTableResponse modifyTable(RpcController controller, 1352 MasterProtos.ModifyTableRequest request) throws ServiceException { 1353 return stub.modifyTable(controller, request); 1354 } 1355 1356 @Override 1357 public MasterProtos.CreateTableResponse createTable(RpcController controller, 1358 MasterProtos.CreateTableRequest request) throws ServiceException { 1359 return stub.createTable(controller, request); 1360 } 1361 1362 @Override 1363 public MasterProtos.ShutdownResponse shutdown(RpcController controller, 1364 MasterProtos.ShutdownRequest request) throws ServiceException { 1365 return stub.shutdown(controller, request); 1366 } 1367 1368 @Override 1369 public MasterProtos.StopMasterResponse stopMaster(RpcController controller, 1370 MasterProtos.StopMasterRequest request) throws ServiceException { 1371 return stub.stopMaster(controller, request); 1372 } 1373 1374 @Override 1375 public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode( 1376 final RpcController controller, 1377 final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException { 1378 return stub.isMasterInMaintenanceMode(controller, request); 1379 } 1380 1381 @Override 1382 public MasterProtos.BalanceResponse balance(RpcController controller, 1383 MasterProtos.BalanceRequest request) throws ServiceException { 1384 return stub.balance(controller, request); 1385 } 1386 1387 @Override 1388 public MasterProtos.SetBalancerRunningResponse setBalancerRunning( 1389 RpcController controller, MasterProtos.SetBalancerRunningRequest request) 1390 throws ServiceException { 1391 return stub.setBalancerRunning(controller, request); 1392 } 1393 1394 @Override 1395 public NormalizeResponse normalize(RpcController controller, 1396 NormalizeRequest request) throws ServiceException { 1397 return stub.normalize(controller, request); 1398 } 1399 1400 @Override 1401 public SetNormalizerRunningResponse setNormalizerRunning( 1402 RpcController controller, SetNormalizerRunningRequest request) 1403 throws ServiceException { 1404 return stub.setNormalizerRunning(controller, request); 1405 } 1406 1407 @Override 1408 public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller, 1409 MasterProtos.RunCatalogScanRequest request) throws ServiceException { 1410 return stub.runCatalogScan(controller, request); 1411 } 1412 1413 @Override 1414 public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor( 1415 RpcController controller, MasterProtos.EnableCatalogJanitorRequest request) 1416 throws ServiceException { 1417 return stub.enableCatalogJanitor(controller, request); 1418 } 1419 1420 @Override 1421 public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled( 1422 RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request) 1423 throws ServiceException { 1424 return stub.isCatalogJanitorEnabled(controller, request); 1425 } 1426 1427 @Override 1428 public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller, 1429 MasterProtos.RunCleanerChoreRequest request) 1430 throws ServiceException { 1431 return stub.runCleanerChore(controller, request); 1432 } 1433 1434 @Override 1435 public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning( 1436 RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request) 1437 throws ServiceException { 1438 return stub.setCleanerChoreRunning(controller, request); 1439 } 1440 1441 @Override 1442 public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled( 1443 RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request) 1444 throws ServiceException { 1445 return stub.isCleanerChoreEnabled(controller, request); 1446 } 1447 1448 @Override 1449 public ClientProtos.CoprocessorServiceResponse execMasterService( 1450 RpcController controller, ClientProtos.CoprocessorServiceRequest request) 1451 throws ServiceException { 1452 return stub.execMasterService(controller, request); 1453 } 1454 1455 @Override 1456 public MasterProtos.SnapshotResponse snapshot(RpcController controller, 1457 MasterProtos.SnapshotRequest request) throws ServiceException { 1458 return stub.snapshot(controller, request); 1459 } 1460 1461 @Override 1462 public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots( 1463 RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request) 1464 throws ServiceException { 1465 return stub.getCompletedSnapshots(controller, request); 1466 } 1467 1468 @Override 1469 public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller, 1470 MasterProtos.DeleteSnapshotRequest request) throws ServiceException { 1471 return stub.deleteSnapshot(controller, request); 1472 } 1473 1474 @Override 1475 public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller, 1476 MasterProtos.IsSnapshotDoneRequest request) throws ServiceException { 1477 return stub.isSnapshotDone(controller, request); 1478 } 1479 1480 @Override 1481 public MasterProtos.RestoreSnapshotResponse restoreSnapshot( 1482 RpcController controller, MasterProtos.RestoreSnapshotRequest request) 1483 throws ServiceException { 1484 return stub.restoreSnapshot(controller, request); 1485 } 1486 1487 @Override 1488 public MasterProtos.ExecProcedureResponse execProcedure( 1489 RpcController controller, MasterProtos.ExecProcedureRequest request) 1490 throws ServiceException { 1491 return stub.execProcedure(controller, request); 1492 } 1493 1494 @Override 1495 public MasterProtos.ExecProcedureResponse execProcedureWithRet( 1496 RpcController controller, MasterProtos.ExecProcedureRequest request) 1497 throws ServiceException { 1498 return stub.execProcedureWithRet(controller, request); 1499 } 1500 1501 @Override 1502 public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller, 1503 MasterProtos.IsProcedureDoneRequest request) throws ServiceException { 1504 return stub.isProcedureDone(controller, request); 1505 } 1506 1507 @Override 1508 public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller, 1509 MasterProtos.GetProcedureResultRequest request) throws ServiceException { 1510 return stub.getProcedureResult(controller, request); 1511 } 1512 1513 @Override 1514 public MasterProtos.IsMasterRunningResponse isMasterRunning( 1515 RpcController controller, MasterProtos.IsMasterRunningRequest request) 1516 throws ServiceException { 1517 return stub.isMasterRunning(controller, request); 1518 } 1519 1520 @Override 1521 public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller, 1522 MasterProtos.ModifyNamespaceRequest request) 1523 throws ServiceException { 1524 return stub.modifyNamespace(controller, request); 1525 } 1526 1527 @Override 1528 public MasterProtos.CreateNamespaceResponse createNamespace( 1529 RpcController controller, 1530 MasterProtos.CreateNamespaceRequest request) throws ServiceException { 1531 return stub.createNamespace(controller, request); 1532 } 1533 1534 @Override 1535 public MasterProtos.DeleteNamespaceResponse deleteNamespace( 1536 RpcController controller, 1537 MasterProtos.DeleteNamespaceRequest request) throws ServiceException { 1538 return stub.deleteNamespace(controller, request); 1539 } 1540 1541 @Override 1542 public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor( 1543 RpcController controller, 1544 MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException { 1545 return stub.getNamespaceDescriptor(controller, request); 1546 } 1547 1548 @Override 1549 public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors( 1550 RpcController controller, 1551 MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException { 1552 return stub.listNamespaceDescriptors(controller, request); 1553 } 1554 1555 @Override 1556 public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace( 1557 RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request) 1558 throws ServiceException { 1559 return stub.listTableDescriptorsByNamespace(controller, request); 1560 } 1561 1562 @Override 1563 public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace( 1564 RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request) 1565 throws ServiceException { 1566 return stub.listTableNamesByNamespace(controller, request); 1567 } 1568 1569 @Override 1570 public MasterProtos.GetTableStateResponse getTableState( 1571 RpcController controller, MasterProtos.GetTableStateRequest request) 1572 throws ServiceException { 1573 return stub.getTableState(controller, request); 1574 } 1575 1576 @Override 1577 public void close() { 1578 release(this.mss); 1579 } 1580 1581 @Override 1582 public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus( 1583 RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request) 1584 throws ServiceException { 1585 return stub.getSchemaAlterStatus(controller, request); 1586 } 1587 1588 @Override 1589 public MasterProtos.GetTableDescriptorsResponse getTableDescriptors( 1590 RpcController controller, MasterProtos.GetTableDescriptorsRequest request) 1591 throws ServiceException { 1592 return stub.getTableDescriptors(controller, request); 1593 } 1594 1595 @Override 1596 public MasterProtos.GetTableNamesResponse getTableNames( 1597 RpcController controller, MasterProtos.GetTableNamesRequest request) 1598 throws ServiceException { 1599 return stub.getTableNames(controller, request); 1600 } 1601 1602 @Override 1603 public MasterProtos.GetClusterStatusResponse getClusterStatus( 1604 RpcController controller, MasterProtos.GetClusterStatusRequest request) 1605 throws ServiceException { 1606 return stub.getClusterStatus(controller, request); 1607 } 1608 1609 @Override 1610 public MasterProtos.SetQuotaResponse setQuota( 1611 RpcController controller, MasterProtos.SetQuotaRequest request) 1612 throws ServiceException { 1613 return stub.setQuota(controller, request); 1614 } 1615 1616 @Override 1617 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp( 1618 RpcController controller, MasterProtos.MajorCompactionTimestampRequest request) 1619 throws ServiceException { 1620 return stub.getLastMajorCompactionTimestamp(controller, request); 1621 } 1622 1623 @Override 1624 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion( 1625 RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request) 1626 throws ServiceException { 1627 return stub.getLastMajorCompactionTimestampForRegion(controller, request); 1628 } 1629 1630 @Override 1631 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, 1632 IsBalancerEnabledRequest request) throws ServiceException { 1633 return stub.isBalancerEnabled(controller, request); 1634 } 1635 1636 @Override 1637 public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled( 1638 RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request) 1639 throws ServiceException { 1640 return stub.setSplitOrMergeEnabled(controller, request); 1641 } 1642 1643 @Override 1644 public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled( 1645 RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request) 1646 throws ServiceException { 1647 return stub.isSplitOrMergeEnabled(controller, request); 1648 } 1649 1650 @Override 1651 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller, 1652 IsNormalizerEnabledRequest request) throws ServiceException { 1653 return stub.isNormalizerEnabled(controller, request); 1654 } 1655 1656 @Override 1657 public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller, 1658 SecurityCapabilitiesRequest request) throws ServiceException { 1659 return stub.getSecurityCapabilities(controller, request); 1660 } 1661 1662 @Override 1663 public AddReplicationPeerResponse addReplicationPeer(RpcController controller, 1664 AddReplicationPeerRequest request) throws ServiceException { 1665 return stub.addReplicationPeer(controller, request); 1666 } 1667 1668 @Override 1669 public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller, 1670 RemoveReplicationPeerRequest request) throws ServiceException { 1671 return stub.removeReplicationPeer(controller, request); 1672 } 1673 1674 @Override 1675 public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller, 1676 EnableReplicationPeerRequest request) throws ServiceException { 1677 return stub.enableReplicationPeer(controller, request); 1678 } 1679 1680 @Override 1681 public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller, 1682 DisableReplicationPeerRequest request) throws ServiceException { 1683 return stub.disableReplicationPeer(controller, request); 1684 } 1685 1686 @Override 1687 public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller, 1688 ListDecommissionedRegionServersRequest request) throws ServiceException { 1689 return stub.listDecommissionedRegionServers(controller, request); 1690 } 1691 1692 @Override 1693 public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller, 1694 DecommissionRegionServersRequest request) throws ServiceException { 1695 return stub.decommissionRegionServers(controller, request); 1696 } 1697 1698 @Override 1699 public RecommissionRegionServerResponse recommissionRegionServer( 1700 RpcController controller, RecommissionRegionServerRequest request) 1701 throws ServiceException { 1702 return stub.recommissionRegionServer(controller, request); 1703 } 1704 1705 @Override 1706 public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller, 1707 GetReplicationPeerConfigRequest request) throws ServiceException { 1708 return stub.getReplicationPeerConfig(controller, request); 1709 } 1710 1711 @Override 1712 public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig( 1713 RpcController controller, UpdateReplicationPeerConfigRequest request) 1714 throws ServiceException { 1715 return stub.updateReplicationPeerConfig(controller, request); 1716 } 1717 1718 @Override 1719 public ListReplicationPeersResponse listReplicationPeers(RpcController controller, 1720 ListReplicationPeersRequest request) throws ServiceException { 1721 return stub.listReplicationPeers(controller, request); 1722 } 1723 1724 @Override 1725 public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes( 1726 RpcController controller, GetSpaceQuotaRegionSizesRequest request) 1727 throws ServiceException { 1728 return stub.getSpaceQuotaRegionSizes(controller, request); 1729 } 1730 1731 @Override 1732 public GetQuotaStatesResponse getQuotaStates( 1733 RpcController controller, GetQuotaStatesRequest request) throws ServiceException { 1734 return stub.getQuotaStates(controller, request); 1735 } 1736 1737 @Override 1738 public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller, 1739 MasterProtos.ClearDeadServersRequest request) throws ServiceException { 1740 return stub.clearDeadServers(controller, request); 1741 } 1742 }; 1743 } 1744 1745 private static void release(MasterServiceState mss) { 1746 if (mss != null && mss.connection != null) { 1747 ((ConnectionImplementation)mss.connection).releaseMaster(mss); 1748 } 1749 } 1750 1751 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) { 1752 if (mss.getStub() == null){ 1753 return false; 1754 } 1755 try { 1756 return mss.isMasterRunning(); 1757 } catch (UndeclaredThrowableException e) { 1758 // It's somehow messy, but we can receive exceptions such as 1759 // java.net.ConnectException but they're not declared. So we catch it... 1760 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); 1761 return false; 1762 } catch (IOException se) { 1763 LOG.warn("Checking master connection", se); 1764 return false; 1765 } 1766 } 1767 1768 void releaseMaster(MasterServiceState mss) { 1769 if (mss.getStub() == null) { 1770 return; 1771 } 1772 synchronized (masterLock) { 1773 --mss.userCount; 1774 } 1775 } 1776 1777 private void closeMasterService(MasterServiceState mss) { 1778 if (mss.getStub() != null) { 1779 LOG.info("Closing master protocol: " + mss); 1780 mss.clearStub(); 1781 } 1782 mss.userCount = 0; 1783 } 1784 1785 /** 1786 * Immediate close of the shared master. Can be by the delayed close or when closing the 1787 * connection itself. 1788 */ 1789 private void closeMaster() { 1790 synchronized (masterLock) { 1791 closeMasterService(masterServiceState); 1792 } 1793 } 1794 1795 void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) { 1796 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum); 1797 cacheLocation(hri.getTable(), source, newHrl); 1798 } 1799 1800 @Override 1801 public void deleteCachedRegionLocation(final HRegionLocation location) { 1802 metaCache.clearCache(location); 1803 } 1804 1805 /** 1806 * Update the location with the new value (if the exception is a RegionMovedException) 1807 * or delete it from the cache. Does nothing if we can be sure from the exception that 1808 * the location is still accurate, or if the cache has already been updated. 1809 * @param exception an object (to simplify user code) on which we will try to find a nested 1810 * or wrapped or both RegionMovedException 1811 * @param source server that is the source of the location update. 1812 */ 1813 @Override 1814 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey, 1815 final Object exception, final ServerName source) { 1816 if (rowkey == null || tableName == null) { 1817 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) + 1818 ", tableName=" + (tableName == null ? "null" : tableName)); 1819 return; 1820 } 1821 1822 if (source == null) { 1823 // This should not happen, but let's secure ourselves. 1824 return; 1825 } 1826 1827 if (regionName == null) { 1828 // we do not know which region, so just remove the cache entry for the row and server 1829 if (metrics != null) { 1830 metrics.incrCacheDroppingExceptions(exception); 1831 } 1832 metaCache.clearCache(tableName, rowkey, source); 1833 return; 1834 } 1835 1836 // Is it something we have already updated? 1837 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey); 1838 HRegionLocation oldLocation = null; 1839 if (oldLocations != null) { 1840 oldLocation = oldLocations.getRegionLocationByRegionName(regionName); 1841 } 1842 if (oldLocation == null || !source.equals(oldLocation.getServerName())) { 1843 // There is no such location in the cache (it's been removed already) or 1844 // the cache has already been refreshed with a different location. => nothing to do 1845 return; 1846 } 1847 1848 RegionInfo regionInfo = oldLocation.getRegion(); 1849 Throwable cause = ClientExceptionsUtil.findException(exception); 1850 if (cause != null) { 1851 if (!ClientExceptionsUtil.isMetaClearingException(cause)) { 1852 // We know that the region is still on this region server 1853 return; 1854 } 1855 1856 if (cause instanceof RegionMovedException) { 1857 RegionMovedException rme = (RegionMovedException) cause; 1858 if (LOG.isTraceEnabled()) { 1859 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " + 1860 rme.getHostname() + ":" + rme.getPort() + 1861 " according to " + source.getAddress()); 1862 } 1863 // We know that the region is not anymore on this region server, but we know 1864 // the new location. 1865 updateCachedLocation( 1866 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum()); 1867 return; 1868 } 1869 } 1870 1871 if (metrics != null) { 1872 metrics.incrCacheDroppingExceptions(exception); 1873 } 1874 1875 // If we're here, it means that can cannot be sure about the location, so we remove it from 1876 // the cache. Do not send the source because source can be a new server in the same host:port 1877 metaCache.clearCache(regionInfo); 1878 } 1879 1880 @Override 1881 public AsyncProcess getAsyncProcess() { 1882 return asyncProcess; 1883 } 1884 1885 @Override 1886 public ServerStatisticTracker getStatisticsTracker() { 1887 return this.stats; 1888 } 1889 1890 @Override 1891 public ClientBackoffPolicy getBackoffPolicy() { 1892 return this.backoffPolicy; 1893 } 1894 1895 /* 1896 * Return the number of cached region for a table. It will only be called 1897 * from a unit test. 1898 */ 1899 @VisibleForTesting 1900 int getNumberOfCachedRegionLocations(final TableName tableName) { 1901 return metaCache.getNumberOfCachedRegionLocations(tableName); 1902 } 1903 1904 @Override 1905 public void abort(final String msg, Throwable t) { 1906 if (t != null) { 1907 LOG.error(HBaseMarkers.FATAL, msg, t); 1908 } else { 1909 LOG.error(HBaseMarkers.FATAL, msg); 1910 } 1911 this.aborted = true; 1912 close(); 1913 this.closed = true; 1914 } 1915 1916 @Override 1917 public boolean isClosed() { 1918 return this.closed; 1919 } 1920 1921 @Override 1922 public boolean isAborted(){ 1923 return this.aborted; 1924 } 1925 1926 @Override 1927 public void close() { 1928 if (this.closed) { 1929 return; 1930 } 1931 closeMaster(); 1932 shutdownPools(); 1933 if (this.metrics != null) { 1934 this.metrics.shutdown(); 1935 } 1936 this.closed = true; 1937 registry.close(); 1938 this.stubs.clear(); 1939 if (clusterStatusListener != null) { 1940 clusterStatusListener.close(); 1941 } 1942 if (rpcClient != null) { 1943 rpcClient.close(); 1944 } 1945 } 1946 1947 /** 1948 * Close the connection for good. On the off chance that someone is unable to close 1949 * the connection, perhaps because it bailed out prematurely, the method 1950 * below will ensure that this instance is cleaned up. 1951 * Caveat: The JVM may take an unknown amount of time to call finalize on an 1952 * unreachable object, so our hope is that every consumer cleans up after 1953 * itself, like any good citizen. 1954 */ 1955 @Override 1956 protected void finalize() throws Throwable { 1957 super.finalize(); 1958 close(); 1959 } 1960 1961 @Override 1962 public NonceGenerator getNonceGenerator() { 1963 return nonceGenerator; 1964 } 1965 1966 @Override 1967 public TableState getTableState(TableName tableName) throws IOException { 1968 checkClosed(); 1969 TableState tableState = MetaTableAccessor.getTableState(this, tableName); 1970 if (tableState == null) { 1971 throw new TableNotFoundException(tableName); 1972 } 1973 return tableState; 1974 } 1975 1976 @Override 1977 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { 1978 return RpcRetryingCallerFactory 1979 .instantiate(conf, this.interceptor, this.getStatisticsTracker()); 1980 } 1981 1982 @Override 1983 public boolean hasCellBlockSupport() { 1984 return this.rpcClient.hasCellBlockSupport(); 1985 } 1986 1987 @Override 1988 public ConnectionConfiguration getConnectionConfiguration() { 1989 return this.connectionConfig; 1990 } 1991 1992 @Override 1993 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { 1994 return this.rpcCallerFactory; 1995 } 1996 1997 @Override 1998 public RpcControllerFactory getRpcControllerFactory() { 1999 return this.rpcControllerFactory; 2000 } 2001 2002 private static <T> T get(CompletableFuture<T> future) throws IOException { 2003 try { 2004 return future.get(); 2005 } catch (InterruptedException e) { 2006 Thread.currentThread().interrupt(); 2007 throw (IOException) new InterruptedIOException().initCause(e); 2008 } catch (ExecutionException e) { 2009 Throwable cause = e.getCause(); 2010 Throwables.propagateIfPossible(cause, IOException.class); 2011 throw new IOException(cause); 2012 } 2013 } 2014}