001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 022import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 024import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 025import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 026import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx; 027 028import edu.umd.cs.findbugs.annotations.Nullable; 029import java.io.Closeable; 030import java.io.IOException; 031import java.io.InterruptedIOException; 032import java.lang.reflect.UndeclaredThrowableException; 033import java.util.ArrayList; 034import java.util.Collections; 035import java.util.Date; 036import java.util.List; 037import java.util.concurrent.BlockingQueue; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.ConcurrentHashMap; 040import java.util.concurrent.ConcurrentMap; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.LinkedBlockingQueue; 044import java.util.concurrent.ThreadPoolExecutor; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.atomic.AtomicInteger; 047import java.util.concurrent.locks.ReentrantLock; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.hbase.AuthUtil; 050import org.apache.hadoop.hbase.CallQueueTooBigException; 051import org.apache.hadoop.hbase.ChoreService; 052import org.apache.hadoop.hbase.DoNotRetryIOException; 053import org.apache.hadoop.hbase.HConstants; 054import org.apache.hadoop.hbase.HRegionLocation; 055import org.apache.hadoop.hbase.MasterNotRunningException; 056import org.apache.hadoop.hbase.MetaTableAccessor; 057import org.apache.hadoop.hbase.RegionLocations; 058import org.apache.hadoop.hbase.ServerName; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.TableNotEnabledException; 061import org.apache.hadoop.hbase.TableNotFoundException; 062import org.apache.hadoop.hbase.ZooKeeperConnectionException; 063import org.apache.hadoop.hbase.client.Scan.ReadType; 064import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 065import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 066import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 067import org.apache.hadoop.hbase.exceptions.RegionMovedException; 068import org.apache.hadoop.hbase.ipc.RpcClient; 069import org.apache.hadoop.hbase.ipc.RpcClientFactory; 070import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 071import org.apache.hadoop.hbase.log.HBaseMarkers; 072import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 073import org.apache.hadoop.hbase.security.User; 074import org.apache.hadoop.hbase.util.Bytes; 075import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 076import org.apache.hadoop.hbase.util.ExceptionUtil; 077import org.apache.hadoop.hbase.util.Pair; 078import org.apache.hadoop.hbase.util.ReflectionUtils; 079import org.apache.hadoop.hbase.util.Threads; 080import org.apache.hadoop.ipc.RemoteException; 081import org.apache.hadoop.security.UserGroupInformation; 082import org.apache.yetus.audience.InterfaceAudience; 083import org.apache.zookeeper.KeeperException; 084import org.slf4j.Logger; 085import org.slf4j.LoggerFactory; 086 087import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 088import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 089import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 090import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 091import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 092 093import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 094import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 144 145/** 146 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. 147 * Encapsulates connection to zookeeper and regionservers. 148 */ 149@edu.umd.cs.findbugs.annotations.SuppressWarnings( 150 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", 151 justification="Access to the conncurrent hash map is under a lock so should be fine.") 152@InterfaceAudience.Private 153class ConnectionImplementation implements ClusterConnection, Closeable { 154 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; 155 private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); 156 157 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; 158 159 private final boolean hostnamesCanChange; 160 private final long pause; 161 private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified 162 private boolean useMetaReplicas; 163 private final int metaReplicaCallTimeoutScanInMicroSecond; 164 private final int numTries; 165 final int rpcTimeout; 166 167 /** 168 * Global nonceGenerator shared per client.Currently there's no reason to limit its scope. 169 * Once it's set under nonceGeneratorCreateLock, it is never unset or changed. 170 */ 171 private static volatile NonceGenerator nonceGenerator = null; 172 /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */ 173 private static final Object nonceGeneratorCreateLock = new Object(); 174 175 private final AsyncProcess asyncProcess; 176 // single tracker per connection 177 private final ServerStatisticTracker stats; 178 179 private volatile boolean closed; 180 private volatile boolean aborted; 181 182 // package protected for the tests 183 ClusterStatusListener clusterStatusListener; 184 185 private final Object metaRegionLock = new Object(); 186 187 private final Object masterLock = new Object(); 188 189 // thread executor shared by all Table instances created 190 // by this connection 191 private volatile ThreadPoolExecutor batchPool = null; 192 // meta thread executor shared by all Table instances created 193 // by this connection 194 private volatile ThreadPoolExecutor metaLookupPool = null; 195 private volatile boolean cleanupPool = false; 196 197 private final Configuration conf; 198 199 // cache the configuration value for tables so that we can avoid calling 200 // the expensive Configuration to fetch the value multiple times. 201 private final ConnectionConfiguration connectionConfig; 202 203 // Client rpc instance. 204 private final RpcClient rpcClient; 205 206 private final MetaCache metaCache; 207 private final MetricsConnection metrics; 208 209 protected User user; 210 211 private final RpcRetryingCallerFactory rpcCallerFactory; 212 213 private final RpcControllerFactory rpcControllerFactory; 214 215 private final RetryingCallerInterceptor interceptor; 216 217 /** 218 * Cluster registry of basic info such as clusterid and meta region location. 219 */ 220 private final AsyncRegistry registry; 221 222 private final ClientBackoffPolicy backoffPolicy; 223 224 /** 225 * Allow setting an alternate BufferedMutator implementation via 226 * config. If null, use default. 227 */ 228 private final String alternateBufferedMutatorClassName; 229 230 /** lock guards against multiple threads trying to query the meta region at the same time */ 231 private final ReentrantLock userRegionLock = new ReentrantLock(); 232 233 private ChoreService authService; 234 235 /** 236 * constructor 237 * @param conf Configuration object 238 */ 239 ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { 240 this.conf = conf; 241 this.user = user; 242 if (user != null && user.isLoginFromKeytab()) { 243 spawnRenewalChore(user.getUGI()); 244 } 245 this.batchPool = (ThreadPoolExecutor) pool; 246 this.connectionConfig = new ConnectionConfiguration(conf); 247 this.closed = false; 248 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 249 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 250 long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); 251 if (configuredPauseForCQTBE < pause) { 252 LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " 253 + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE 254 + ", will use " + pause + " instead."); 255 this.pauseForCQTBE = pause; 256 } else { 257 this.pauseForCQTBE = configuredPauseForCQTBE; 258 } 259 this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS, 260 HConstants.DEFAULT_USE_META_REPLICAS); 261 this.metaReplicaCallTimeoutScanInMicroSecond = 262 connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan(); 263 264 // how many times to try, one more than max *retry* time 265 this.numTries = retries2Attempts(connectionConfig.getRetriesNumber()); 266 this.rpcTimeout = conf.getInt( 267 HConstants.HBASE_RPC_TIMEOUT_KEY, 268 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 269 if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) { 270 synchronized (nonceGeneratorCreateLock) { 271 if (nonceGenerator == null) { 272 nonceGenerator = PerClientRandomNonceGenerator.get(); 273 } 274 } 275 } else { 276 nonceGenerator = NO_NONCE_GENERATOR; 277 } 278 279 this.stats = ServerStatisticTracker.create(conf); 280 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); 281 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 282 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); 283 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 284 this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); 285 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 286 this.metrics = 287 new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool); 288 } else { 289 this.metrics = null; 290 } 291 this.metaCache = new MetaCache(this.metrics); 292 293 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, 294 HConstants.STATUS_PUBLISHED_DEFAULT); 295 this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); 296 Class<? extends ClusterStatusListener.Listener> listenerClass = 297 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, 298 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, 299 ClusterStatusListener.Listener.class); 300 301 // Is there an alternate BufferedMutator to use? 302 this.alternateBufferedMutatorClassName = 303 this.conf.get(BufferedMutator.CLASSNAME_KEY); 304 305 try { 306 this.registry = AsyncRegistryFactory.getRegistry(conf); 307 retrieveClusterId(); 308 309 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); 310 311 // Do we publish the status? 312 if (shouldListen) { 313 if (listenerClass == null) { 314 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + 315 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); 316 } else { 317 clusterStatusListener = new ClusterStatusListener( 318 new ClusterStatusListener.DeadServerHandler() { 319 @Override 320 public void newDead(ServerName sn) { 321 clearCaches(sn); 322 rpcClient.cancelConnections(sn); 323 } 324 }, conf, listenerClass); 325 } 326 } 327 } catch (Throwable e) { 328 // avoid leaks: registry, rpcClient, ... 329 LOG.debug("connection construction failed", e); 330 close(); 331 throw e; 332 } 333 } 334 335 private void spawnRenewalChore(final UserGroupInformation user) { 336 authService = new ChoreService("Relogin service"); 337 authService.scheduleChore(AuthUtil.getAuthRenewalChore(user)); 338 } 339 340 /** 341 * @param useMetaReplicas 342 */ 343 @VisibleForTesting 344 void setUseMetaReplicas(final boolean useMetaReplicas) { 345 this.useMetaReplicas = useMetaReplicas; 346 } 347 348 /** 349 * @param conn The connection for which to replace the generator. 350 * @param cnm Replaces the nonce generator used, for testing. 351 * @return old nonce generator. 352 */ 353 @VisibleForTesting 354 static NonceGenerator injectNonceGeneratorForTesting( 355 ClusterConnection conn, NonceGenerator cnm) { 356 ConnectionImplementation connImpl = (ConnectionImplementation)conn; 357 NonceGenerator ng = connImpl.getNonceGenerator(); 358 LOG.warn("Nonce generator is being replaced by test code for " 359 + cnm.getClass().getName()); 360 nonceGenerator = cnm; 361 return ng; 362 } 363 364 @Override 365 public Table getTable(TableName tableName) throws IOException { 366 return getTable(tableName, getBatchPool()); 367 } 368 369 @Override 370 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 371 return new TableBuilderBase(tableName, connectionConfig) { 372 373 @Override 374 public Table build() { 375 return new HTable(ConnectionImplementation.this, this, rpcCallerFactory, 376 rpcControllerFactory, pool); 377 } 378 }; 379 } 380 381 @Override 382 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { 383 if (params.getTableName() == null) { 384 throw new IllegalArgumentException("TableName cannot be null."); 385 } 386 if (params.getPool() == null) { 387 params.pool(HTable.getDefaultExecutor(getConfiguration())); 388 } 389 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { 390 params.writeBufferSize(connectionConfig.getWriteBufferSize()); 391 } 392 if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) { 393 params.setWriteBufferPeriodicFlushTimeoutMs( 394 connectionConfig.getWriteBufferPeriodicFlushTimeoutMs()); 395 } 396 if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) { 397 params.setWriteBufferPeriodicFlushTimerTickMs( 398 connectionConfig.getWriteBufferPeriodicFlushTimerTickMs()); 399 } 400 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { 401 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); 402 } 403 // Look to see if an alternate BufferedMutation implementation is wanted. 404 // Look in params and in config. If null, use default. 405 String implementationClassName = params.getImplementationClassName(); 406 if (implementationClassName == null) { 407 implementationClassName = this.alternateBufferedMutatorClassName; 408 } 409 if (implementationClassName == null) { 410 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); 411 } 412 try { 413 return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName), 414 this, rpcCallerFactory, rpcControllerFactory, params); 415 } catch (ClassNotFoundException e) { 416 throw new RuntimeException(e); 417 } 418 } 419 420 @Override 421 public BufferedMutator getBufferedMutator(TableName tableName) { 422 return getBufferedMutator(new BufferedMutatorParams(tableName)); 423 } 424 425 @Override 426 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 427 return new HRegionLocator(tableName, this); 428 } 429 430 @Override 431 public Admin getAdmin() throws IOException { 432 return new HBaseAdmin(this); 433 } 434 435 @Override 436 public Hbck getHbck() throws IOException { 437 return getHbck(get(registry.getMasterAddress())); 438 } 439 440 @Override 441 public Hbck getHbck(ServerName masterServer) throws IOException { 442 checkClosed(); 443 if (isDeadServer(masterServer)) { 444 throw new RegionServerStoppedException(masterServer + " is dead."); 445 } 446 String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), 447 masterServer, this.hostnamesCanChange); 448 449 return new HBaseHbck( 450 (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 451 BlockingRpcChannel channel = 452 this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); 453 return MasterProtos.HbckService.newBlockingStub(channel); 454 }), rpcControllerFactory); 455 } 456 457 @Override 458 public MetricsConnection getConnectionMetrics() { 459 return this.metrics; 460 } 461 462 private ThreadPoolExecutor getBatchPool() { 463 if (batchPool == null) { 464 synchronized (this) { 465 if (batchPool == null) { 466 int threads = conf.getInt("hbase.hconnection.threads.max", 256); 467 this.batchPool = getThreadPool(threads, threads, "-shared", null); 468 this.cleanupPool = true; 469 } 470 } 471 } 472 return this.batchPool; 473 } 474 475 private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint, 476 BlockingQueue<Runnable> passedWorkQueue) { 477 // shared HTable thread executor not yet initialized 478 if (maxThreads == 0) { 479 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 480 } 481 if (coreThreads == 0) { 482 coreThreads = Runtime.getRuntime().availableProcessors() * 8; 483 } 484 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); 485 BlockingQueue<Runnable> workQueue = passedWorkQueue; 486 if (workQueue == null) { 487 workQueue = 488 new LinkedBlockingQueue<>(maxThreads * 489 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 490 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 491 coreThreads = maxThreads; 492 } 493 ThreadPoolExecutor tpe = new ThreadPoolExecutor( 494 coreThreads, 495 maxThreads, 496 keepAliveTime, 497 TimeUnit.SECONDS, 498 workQueue, 499 Threads.newDaemonThreadFactory(toString() + nameHint)); 500 tpe.allowCoreThreadTimeOut(true); 501 return tpe; 502 } 503 504 private ThreadPoolExecutor getMetaLookupPool() { 505 if (this.metaLookupPool == null) { 506 synchronized (this) { 507 if (this.metaLookupPool == null) { 508 //Some of the threads would be used for meta replicas 509 //To start with, threads.max.core threads can hit the meta (including replicas). 510 //After that, requests will get queued up in the passed queue, and only after 511 //the queue is full, a new thread will be started 512 int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128); 513 this.metaLookupPool = getThreadPool( 514 threads, 515 threads, 516 "-metaLookup-shared-", new LinkedBlockingQueue<>()); 517 } 518 } 519 } 520 return this.metaLookupPool; 521 } 522 523 protected ExecutorService getCurrentMetaLookupPool() { 524 return metaLookupPool; 525 } 526 527 protected ExecutorService getCurrentBatchPool() { 528 return batchPool; 529 } 530 531 private void shutdownPools() { 532 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { 533 shutdownBatchPool(this.batchPool); 534 } 535 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { 536 shutdownBatchPool(this.metaLookupPool); 537 } 538 } 539 540 private void shutdownBatchPool(ExecutorService pool) { 541 pool.shutdown(); 542 try { 543 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 544 pool.shutdownNow(); 545 } 546 } catch (InterruptedException e) { 547 pool.shutdownNow(); 548 } 549 } 550 551 /** 552 * For tests only. 553 */ 554 @VisibleForTesting 555 RpcClient getRpcClient() { 556 return rpcClient; 557 } 558 559 /** 560 * An identifier that will remain the same for a given connection. 561 */ 562 @Override 563 public String toString(){ 564 return "hconnection-0x" + Integer.toHexString(hashCode()); 565 } 566 567 protected String clusterId = null; 568 569 protected void retrieveClusterId() { 570 if (clusterId != null) { 571 return; 572 } 573 try { 574 this.clusterId = this.registry.getClusterId().get(); 575 } catch (InterruptedException | ExecutionException e) { 576 LOG.warn("Retrieve cluster id failed", e); 577 } 578 if (clusterId == null) { 579 clusterId = HConstants.CLUSTER_ID_DEFAULT; 580 LOG.debug("clusterid came back null, using default " + clusterId); 581 } 582 } 583 584 @Override 585 public Configuration getConfiguration() { 586 return this.conf; 587 } 588 589 private void checkClosed() throws DoNotRetryIOException { 590 if (this.closed) { 591 throw new DoNotRetryIOException(toString() + " closed"); 592 } 593 } 594 595 /** 596 * @return true if the master is running, throws an exception otherwise 597 * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running 598 * @deprecated this has been deprecated without a replacement 599 */ 600 @Deprecated 601 @Override 602 public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { 603 // When getting the master connection, we check it's running, 604 // so if there is no exception, it means we've been able to get a 605 // connection on a running master 606 MasterKeepAliveConnection m; 607 try { 608 m = getKeepAliveMasterService(); 609 } catch (IOException e) { 610 throw new MasterNotRunningException(e); 611 } 612 m.close(); 613 return true; 614 } 615 616 @Override 617 public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, 618 boolean reload) throws IOException { 619 return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row); 620 } 621 622 623 @Override 624 public boolean isTableEnabled(TableName tableName) throws IOException { 625 return getTableState(tableName).inStates(TableState.State.ENABLED); 626 } 627 628 @Override 629 public boolean isTableDisabled(TableName tableName) throws IOException { 630 return getTableState(tableName).inStates(TableState.State.DISABLED); 631 } 632 633 @Override 634 public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) 635 throws IOException { 636 checkClosed(); 637 try { 638 if (!isTableEnabled(tableName)) { 639 LOG.debug("Table {} not enabled", tableName); 640 return false; 641 } 642 if (TableName.isMetaTableName(tableName)) { 643 // meta table is always available 644 return true; 645 } 646 List<Pair<RegionInfo, ServerName>> locations = 647 MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true); 648 649 int notDeployed = 0; 650 int regionCount = 0; 651 for (Pair<RegionInfo, ServerName> pair : locations) { 652 RegionInfo info = pair.getFirst(); 653 if (pair.getSecond() == null) { 654 LOG.debug("Table {} has not deployed region {}", tableName, 655 pair.getFirst().getEncodedName()); 656 notDeployed++; 657 } else if (splitKeys != null 658 && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 659 for (byte[] splitKey : splitKeys) { 660 // Just check if the splitkey is available 661 if (Bytes.equals(info.getStartKey(), splitKey)) { 662 regionCount++; 663 break; 664 } 665 } 666 } else { 667 // Always empty start row should be counted 668 regionCount++; 669 } 670 } 671 if (notDeployed > 0) { 672 if (LOG.isDebugEnabled()) { 673 LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed); 674 } 675 return false; 676 } else if (splitKeys != null && regionCount != splitKeys.length + 1) { 677 if (LOG.isDebugEnabled()) { 678 LOG.debug("Table {} expected to have {} regions, but only {} available", tableName, 679 splitKeys.length + 1, regionCount); 680 } 681 return false; 682 } else { 683 LOG.trace("Table {} should be available", tableName); 684 return true; 685 } 686 } catch (TableNotFoundException tnfe) { 687 LOG.warn("Table {} does not exist", tableName); 688 return false; 689 } 690 } 691 692 @Override 693 public HRegionLocation locateRegion(final byte[] regionName) throws IOException { 694 RegionLocations locations = locateRegion(RegionInfo.getTable(regionName), 695 RegionInfo.getStartKey(regionName), false, true); 696 return locations == null ? null : locations.getRegionLocation(); 697 } 698 699 private boolean isDeadServer(ServerName sn) { 700 if (clusterStatusListener == null) { 701 return false; 702 } else { 703 return clusterStatusListener.isDeadServer(sn); 704 } 705 } 706 707 @Override 708 public List<HRegionLocation> locateRegions(TableName tableName) throws IOException { 709 return locateRegions(tableName, false, true); 710 } 711 712 @Override 713 public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache, 714 boolean offlined) throws IOException { 715 List<RegionInfo> regions; 716 if (TableName.isMetaTableName(tableName)) { 717 regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO); 718 } else { 719 regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined); 720 } 721 List<HRegionLocation> locations = new ArrayList<>(); 722 for (RegionInfo regionInfo : regions) { 723 if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) { 724 continue; 725 } 726 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true); 727 if (list != null) { 728 for (HRegionLocation loc : list.getRegionLocations()) { 729 if (loc != null) { 730 locations.add(loc); 731 } 732 } 733 } 734 } 735 return locations; 736 } 737 738 @Override 739 public HRegionLocation locateRegion(final TableName tableName, final byte[] row) 740 throws IOException { 741 RegionLocations locations = locateRegion(tableName, row, true, true); 742 return locations == null ? null : locations.getRegionLocation(); 743 } 744 745 @Override 746 public HRegionLocation relocateRegion(final TableName tableName, final byte[] row) 747 throws IOException { 748 RegionLocations locations = 749 relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 750 return locations == null ? null 751 : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); 752 } 753 754 @Override 755 public RegionLocations relocateRegion(final TableName tableName, 756 final byte [] row, int replicaId) throws IOException{ 757 // Since this is an explicit request not to use any caching, finding 758 // disabled tables should not be desirable. This will ensure that an exception is thrown when 759 // the first time a disabled table is interacted with. 760 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) { 761 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); 762 } 763 764 return locateRegion(tableName, row, false, true, replicaId); 765 } 766 767 @Override 768 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 769 boolean retry) throws IOException { 770 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); 771 } 772 773 @Override 774 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 775 boolean retry, int replicaId) throws IOException { 776 checkClosed(); 777 if (tableName == null || tableName.getName().length == 0) { 778 throw new IllegalArgumentException("table name cannot be null or zero length"); 779 } 780 if (tableName.equals(TableName.META_TABLE_NAME)) { 781 return locateMeta(tableName, useCache, replicaId); 782 } else { 783 // Region not in the cache - have to go to the meta RS 784 return locateRegionInMeta(tableName, row, useCache, retry, replicaId); 785 } 786 } 787 788 private RegionLocations locateMeta(final TableName tableName, 789 boolean useCache, int replicaId) throws IOException { 790 // HBASE-10785: We cache the location of the META itself, so that we are not overloading 791 // zookeeper with one request for every region lookup. We cache the META with empty row 792 // key in MetaCache. 793 byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta 794 RegionLocations locations = null; 795 if (useCache) { 796 locations = getCachedLocation(tableName, metaCacheKey); 797 if (locations != null && locations.getRegionLocation(replicaId) != null) { 798 return locations; 799 } 800 } 801 802 // only one thread should do the lookup. 803 synchronized (metaRegionLock) { 804 // Check the cache again for a hit in case some other thread made the 805 // same query while we were waiting on the lock. 806 if (useCache) { 807 locations = getCachedLocation(tableName, metaCacheKey); 808 if (locations != null && locations.getRegionLocation(replicaId) != null) { 809 return locations; 810 } 811 } 812 813 // Look up from zookeeper 814 locations = get(this.registry.getMetaRegionLocation()); 815 if (locations != null) { 816 cacheLocation(tableName, locations); 817 } 818 } 819 return locations; 820 } 821 822 /** 823 * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're 824 * seeking. 825 */ 826 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, 827 boolean retry, int replicaId) throws IOException { 828 // If we are supposed to be using the cache, look in the cache to see if we already have the 829 // region. 830 if (useCache) { 831 RegionLocations locations = getCachedLocation(tableName, row); 832 if (locations != null && locations.getRegionLocation(replicaId) != null) { 833 return locations; 834 } 835 } 836 // build the key of the meta region we should be looking for. 837 // the extra 9's on the end are necessary to allow "exact" matches 838 // without knowing the precise region names. 839 byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 840 byte[] metaStopKey = 841 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 842 Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 843 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5) 844 .setReadType(ReadType.PREAD); 845 if (this.useMetaReplicas) { 846 s.setConsistency(Consistency.TIMELINE); 847 } 848 int maxAttempts = (retry ? numTries : 1); 849 boolean relocateMeta = false; 850 for (int tries = 0; ; tries++) { 851 if (tries >= maxAttempts) { 852 throw new NoServerForRegionException("Unable to find region for " 853 + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries."); 854 } 855 if (useCache) { 856 RegionLocations locations = getCachedLocation(tableName, row); 857 if (locations != null && locations.getRegionLocation(replicaId) != null) { 858 return locations; 859 } 860 } else { 861 // If we are not supposed to be using the cache, delete any existing cached location 862 // so it won't interfere. 863 // We are only supposed to clean the cache for the specific replicaId 864 metaCache.clearCache(tableName, row, replicaId); 865 } 866 // Query the meta region 867 long pauseBase = this.pause; 868 userRegionLock.lock(); 869 try { 870 if (useCache) {// re-check cache after get lock 871 RegionLocations locations = getCachedLocation(tableName, row); 872 if (locations != null && locations.getRegionLocation(replicaId) != null) { 873 return locations; 874 } 875 } 876 if (relocateMeta) { 877 relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, 878 RegionInfo.DEFAULT_REPLICA_ID); 879 } 880 s.resetMvccReadPoint(); 881 try (ReversedClientScanner rcs = 882 new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, 883 rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) { 884 boolean tableNotFound = true; 885 for (;;) { 886 Result regionInfoRow = rcs.next(); 887 if (regionInfoRow == null) { 888 if (tableNotFound) { 889 throw new TableNotFoundException(tableName); 890 } else { 891 throw new IOException( 892 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 893 } 894 } 895 tableNotFound = false; 896 // convert the row result into the HRegionLocation we need! 897 RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); 898 if (locations == null || locations.getRegionLocation(replicaId) == null) { 899 throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow); 900 } 901 RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion(); 902 if (regionInfo == null) { 903 throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME + 904 ", row=" + regionInfoRow); 905 } 906 // See HBASE-20182. It is possible that we locate to a split parent even after the 907 // children are online, so here we need to skip this region and go to the next one. 908 if (regionInfo.isSplitParent()) { 909 continue; 910 } 911 if (regionInfo.isOffline()) { 912 throw new RegionOfflineException("Region offline; disable table call? " + 913 regionInfo.getRegionNameAsString()); 914 } 915 // It is possible that the split children have not been online yet and we have skipped 916 // the parent in the above condition, so we may have already reached a region which does 917 // not contains us. 918 if (!regionInfo.containsRow(row)) { 919 throw new IOException( 920 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 921 } 922 ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); 923 if (serverName == null) { 924 throw new NoServerForRegionException("No server address listed in " + 925 TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() + 926 " containing row " + Bytes.toStringBinary(row)); 927 } 928 if (isDeadServer(serverName)) { 929 throw new RegionServerStoppedException( 930 "hbase:meta says the region " + regionInfo.getRegionNameAsString() + 931 " is managed by the server " + serverName + ", but it is dead."); 932 } 933 // Instantiate the location 934 cacheLocation(tableName, locations); 935 return locations; 936 } 937 } 938 } catch (TableNotFoundException e) { 939 // if we got this error, probably means the table just plain doesn't 940 // exist. rethrow the error immediately. this should always be coming 941 // from the HTable constructor. 942 throw e; 943 } catch (IOException e) { 944 ExceptionUtil.rethrowIfInterrupt(e); 945 if (e instanceof RemoteException) { 946 e = ((RemoteException)e).unwrapRemoteException(); 947 } 948 if (e instanceof CallQueueTooBigException) { 949 // Give a special check on CallQueueTooBigException, see #HBASE-17114 950 pauseBase = this.pauseForCQTBE; 951 } 952 if (tries < maxAttempts - 1) { 953 LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " + 954 "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e); 955 } else { 956 throw e; 957 } 958 // Only relocate the parent region if necessary 959 relocateMeta = 960 !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); 961 } finally { 962 userRegionLock.unlock(); 963 } 964 try{ 965 Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); 966 } catch (InterruptedException e) { 967 throw new InterruptedIOException("Giving up trying to location region in " + 968 "meta: thread is interrupted."); 969 } 970 } 971 } 972 973 /** 974 * Put a newly discovered HRegionLocation into the cache. 975 * @param tableName The table name. 976 * @param location the new location 977 */ 978 @Override 979 public void cacheLocation(final TableName tableName, final RegionLocations location) { 980 metaCache.cacheLocation(tableName, location); 981 } 982 983 /** 984 * Search the cache for a location that fits our table and row key. 985 * Return null if no suitable region is located. 986 * @return Null or region location found in cache. 987 */ 988 RegionLocations getCachedLocation(final TableName tableName, 989 final byte [] row) { 990 return metaCache.getCachedLocation(tableName, row); 991 } 992 993 public void clearRegionCache(final TableName tableName, byte[] row) { 994 metaCache.clearCache(tableName, row); 995 } 996 997 /* 998 * Delete all cached entries of a table that maps to a specific location. 999 */ 1000 @Override 1001 public void clearCaches(final ServerName serverName) { 1002 metaCache.clearCache(serverName); 1003 } 1004 1005 @Override 1006 public void clearRegionLocationCache() { 1007 metaCache.clearCache(); 1008 } 1009 1010 @Override 1011 public void clearRegionCache(final TableName tableName) { 1012 metaCache.clearCache(tableName); 1013 } 1014 1015 /** 1016 * Put a newly discovered HRegionLocation into the cache. 1017 * @param tableName The table name. 1018 * @param source the source of the new location, if it's not coming from meta 1019 * @param location the new location 1020 */ 1021 private void cacheLocation(final TableName tableName, final ServerName source, 1022 final HRegionLocation location) { 1023 metaCache.cacheLocation(tableName, source, location); 1024 } 1025 1026 // Map keyed by service name + regionserver to service stub implementation 1027 private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>(); 1028 1029 /** 1030 * State of the MasterService connection/setup. 1031 */ 1032 static class MasterServiceState { 1033 Connection connection; 1034 1035 MasterProtos.MasterService.BlockingInterface stub; 1036 int userCount; 1037 1038 MasterServiceState(final Connection connection) { 1039 super(); 1040 this.connection = connection; 1041 } 1042 1043 @Override 1044 public String toString() { 1045 return "MasterService"; 1046 } 1047 1048 Object getStub() { 1049 return this.stub; 1050 } 1051 1052 void clearStub() { 1053 this.stub = null; 1054 } 1055 1056 boolean isMasterRunning() throws IOException { 1057 MasterProtos.IsMasterRunningResponse response = null; 1058 try { 1059 response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1060 } catch (Exception e) { 1061 throw ProtobufUtil.handleRemoteException(e); 1062 } 1063 return response != null? response.getIsMasterRunning(): false; 1064 } 1065 } 1066 1067 /** 1068 * The record of errors for servers. 1069 */ 1070 static class ServerErrorTracker { 1071 // We need a concurrent map here, as we could have multiple threads updating it in parallel. 1072 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>(); 1073 private final long canRetryUntil; 1074 private final int maxTries;// max number to try 1075 private final long startTrackingTime; 1076 1077 /** 1078 * Constructor 1079 * @param timeout how long to wait before timeout, in unit of millisecond 1080 * @param maxTries how many times to try 1081 */ 1082 public ServerErrorTracker(long timeout, int maxTries) { 1083 this.maxTries = maxTries; 1084 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; 1085 this.startTrackingTime = new Date().getTime(); 1086 } 1087 1088 /** 1089 * We stop to retry when we have exhausted BOTH the number of tries and the time allocated. 1090 * @param numAttempt how many times we have tried by now 1091 */ 1092 boolean canTryMore(int numAttempt) { 1093 // If there is a single try we must not take into account the time. 1094 return numAttempt < maxTries || (maxTries > 1 && 1095 EnvironmentEdgeManager.currentTime() < this.canRetryUntil); 1096 } 1097 1098 /** 1099 * Calculates the back-off time for a retrying request to a particular server. 1100 * 1101 * @param server The server in question. 1102 * @param basePause The default hci pause. 1103 * @return The time to wait before sending next request. 1104 */ 1105 long calculateBackoffTime(ServerName server, long basePause) { 1106 long result; 1107 ServerErrors errorStats = errorsByServer.get(server); 1108 if (errorStats != null) { 1109 result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1)); 1110 } else { 1111 result = 0; // yes, if the server is not in our list we don't wait before retrying. 1112 } 1113 return result; 1114 } 1115 1116 /** 1117 * Reports that there was an error on the server to do whatever bean-counting necessary. 1118 * @param server The server in question. 1119 */ 1120 void reportServerError(ServerName server) { 1121 computeIfAbsent(errorsByServer, server, ServerErrors::new).addError(); 1122 } 1123 1124 long getStartTrackingTime() { 1125 return startTrackingTime; 1126 } 1127 1128 /** 1129 * The record of errors for a server. 1130 */ 1131 private static class ServerErrors { 1132 private final AtomicInteger retries = new AtomicInteger(0); 1133 1134 public int getCount() { 1135 return retries.get(); 1136 } 1137 1138 public void addError() { 1139 retries.incrementAndGet(); 1140 } 1141 } 1142 } 1143 1144 /** 1145 * Class to make a MasterServiceStubMaker stub. 1146 */ 1147 private final class MasterServiceStubMaker { 1148 1149 private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) 1150 throws IOException { 1151 try { 1152 stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1153 } catch (ServiceException e) { 1154 throw ProtobufUtil.handleRemoteException(e); 1155 } 1156 } 1157 1158 /** 1159 * Create a stub. Try once only. It is not typed because there is no common type to protobuf 1160 * services nor their interfaces. Let the caller do appropriate casting. 1161 * @return A stub for master services. 1162 */ 1163 private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() 1164 throws IOException, KeeperException { 1165 ServerName sn = get(registry.getMasterAddress()); 1166 if (sn == null) { 1167 String msg = "ZooKeeper available but no active master location found"; 1168 LOG.info(msg); 1169 throw new MasterNotRunningException(msg); 1170 } 1171 if (isDeadServer(sn)) { 1172 throw new MasterNotRunningException(sn + " is dead."); 1173 } 1174 // Use the security info interface name as our stub key 1175 String key = 1176 getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange); 1177 MasterProtos.MasterService.BlockingInterface stub = 1178 (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1179 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); 1180 return MasterProtos.MasterService.newBlockingStub(channel); 1181 }); 1182 isMasterRunning(stub); 1183 return stub; 1184 } 1185 1186 /** 1187 * Create a stub against the master. Retry if necessary. 1188 * @return A stub to do <code>intf</code> against the master 1189 * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running 1190 */ 1191 MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { 1192 // The lock must be at the beginning to prevent multiple master creations 1193 // (and leaks) in a multithread context 1194 synchronized (masterLock) { 1195 Exception exceptionCaught = null; 1196 if (!closed) { 1197 try { 1198 return makeStubNoRetries(); 1199 } catch (IOException e) { 1200 exceptionCaught = e; 1201 } catch (KeeperException e) { 1202 exceptionCaught = e; 1203 } 1204 throw new MasterNotRunningException(exceptionCaught); 1205 } else { 1206 throw new DoNotRetryIOException("Connection was closed while trying to get master"); 1207 } 1208 } 1209 } 1210 } 1211 1212 @Override 1213 public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException { 1214 return getAdmin(get(registry.getMasterAddress())); 1215 } 1216 1217 @Override 1218 public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName) 1219 throws IOException { 1220 checkClosed(); 1221 if (isDeadServer(serverName)) { 1222 throw new RegionServerStoppedException(serverName + " is dead."); 1223 } 1224 String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, 1225 this.hostnamesCanChange); 1226 return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1227 BlockingRpcChannel channel = 1228 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1229 return AdminProtos.AdminService.newBlockingStub(channel); 1230 }); 1231 } 1232 1233 @Override 1234 public BlockingInterface getClient(ServerName serverName) throws IOException { 1235 checkClosed(); 1236 if (isDeadServer(serverName)) { 1237 throw new RegionServerStoppedException(serverName + " is dead."); 1238 } 1239 String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), 1240 serverName, this.hostnamesCanChange); 1241 return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1242 BlockingRpcChannel channel = 1243 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1244 return ClientProtos.ClientService.newBlockingStub(channel); 1245 }); 1246 } 1247 1248 final MasterServiceState masterServiceState = new MasterServiceState(this); 1249 1250 @Override 1251 public MasterKeepAliveConnection getMaster() throws IOException { 1252 return getKeepAliveMasterService(); 1253 } 1254 1255 private void resetMasterServiceState(final MasterServiceState mss) { 1256 mss.userCount++; 1257 } 1258 1259 private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException { 1260 synchronized (masterLock) { 1261 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) { 1262 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); 1263 this.masterServiceState.stub = stubMaker.makeStub(); 1264 } 1265 resetMasterServiceState(this.masterServiceState); 1266 } 1267 // Ugly delegation just so we can add in a Close method. 1268 final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub; 1269 return new MasterKeepAliveConnection() { 1270 MasterServiceState mss = masterServiceState; 1271 1272 @Override 1273 public MasterProtos.AbortProcedureResponse abortProcedure( 1274 RpcController controller, 1275 MasterProtos.AbortProcedureRequest request) throws ServiceException { 1276 return stub.abortProcedure(controller, request); 1277 } 1278 1279 @Override 1280 public MasterProtos.GetProceduresResponse getProcedures( 1281 RpcController controller, 1282 MasterProtos.GetProceduresRequest request) throws ServiceException { 1283 return stub.getProcedures(controller, request); 1284 } 1285 1286 @Override 1287 public MasterProtos.GetLocksResponse getLocks( 1288 RpcController controller, 1289 MasterProtos.GetLocksRequest request) throws ServiceException { 1290 return stub.getLocks(controller, request); 1291 } 1292 1293 @Override 1294 public MasterProtos.AddColumnResponse addColumn( 1295 RpcController controller, 1296 MasterProtos.AddColumnRequest request) throws ServiceException { 1297 return stub.addColumn(controller, request); 1298 } 1299 1300 @Override 1301 public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller, 1302 MasterProtos.DeleteColumnRequest request) 1303 throws ServiceException { 1304 return stub.deleteColumn(controller, request); 1305 } 1306 1307 @Override 1308 public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller, 1309 MasterProtos.ModifyColumnRequest request) 1310 throws ServiceException { 1311 return stub.modifyColumn(controller, request); 1312 } 1313 1314 @Override 1315 public MasterProtos.MoveRegionResponse moveRegion(RpcController controller, 1316 MasterProtos.MoveRegionRequest request) throws ServiceException { 1317 return stub.moveRegion(controller, request); 1318 } 1319 1320 @Override 1321 public MasterProtos.MergeTableRegionsResponse mergeTableRegions( 1322 RpcController controller, MasterProtos.MergeTableRegionsRequest request) 1323 throws ServiceException { 1324 return stub.mergeTableRegions(controller, request); 1325 } 1326 1327 @Override 1328 public MasterProtos.AssignRegionResponse assignRegion(RpcController controller, 1329 MasterProtos.AssignRegionRequest request) throws ServiceException { 1330 return stub.assignRegion(controller, request); 1331 } 1332 1333 @Override 1334 public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller, 1335 MasterProtos.UnassignRegionRequest request) throws ServiceException { 1336 return stub.unassignRegion(controller, request); 1337 } 1338 1339 @Override 1340 public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller, 1341 MasterProtos.OfflineRegionRequest request) throws ServiceException { 1342 return stub.offlineRegion(controller, request); 1343 } 1344 1345 @Override 1346 public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller, 1347 MasterProtos.SplitTableRegionRequest request) throws ServiceException { 1348 return stub.splitRegion(controller, request); 1349 } 1350 1351 @Override 1352 public MasterProtos.DeleteTableResponse deleteTable(RpcController controller, 1353 MasterProtos.DeleteTableRequest request) throws ServiceException { 1354 return stub.deleteTable(controller, request); 1355 } 1356 1357 @Override 1358 public MasterProtos.TruncateTableResponse truncateTable(RpcController controller, 1359 MasterProtos.TruncateTableRequest request) throws ServiceException { 1360 return stub.truncateTable(controller, request); 1361 } 1362 1363 @Override 1364 public MasterProtos.EnableTableResponse enableTable(RpcController controller, 1365 MasterProtos.EnableTableRequest request) throws ServiceException { 1366 return stub.enableTable(controller, request); 1367 } 1368 1369 @Override 1370 public MasterProtos.DisableTableResponse disableTable(RpcController controller, 1371 MasterProtos.DisableTableRequest request) throws ServiceException { 1372 return stub.disableTable(controller, request); 1373 } 1374 1375 @Override 1376 public MasterProtos.ModifyTableResponse modifyTable(RpcController controller, 1377 MasterProtos.ModifyTableRequest request) throws ServiceException { 1378 return stub.modifyTable(controller, request); 1379 } 1380 1381 @Override 1382 public MasterProtos.CreateTableResponse createTable(RpcController controller, 1383 MasterProtos.CreateTableRequest request) throws ServiceException { 1384 return stub.createTable(controller, request); 1385 } 1386 1387 @Override 1388 public MasterProtos.ShutdownResponse shutdown(RpcController controller, 1389 MasterProtos.ShutdownRequest request) throws ServiceException { 1390 return stub.shutdown(controller, request); 1391 } 1392 1393 @Override 1394 public MasterProtos.StopMasterResponse stopMaster(RpcController controller, 1395 MasterProtos.StopMasterRequest request) throws ServiceException { 1396 return stub.stopMaster(controller, request); 1397 } 1398 1399 @Override 1400 public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode( 1401 final RpcController controller, 1402 final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException { 1403 return stub.isMasterInMaintenanceMode(controller, request); 1404 } 1405 1406 @Override 1407 public MasterProtos.BalanceResponse balance(RpcController controller, 1408 MasterProtos.BalanceRequest request) throws ServiceException { 1409 return stub.balance(controller, request); 1410 } 1411 1412 @Override 1413 public MasterProtos.SetBalancerRunningResponse setBalancerRunning( 1414 RpcController controller, MasterProtos.SetBalancerRunningRequest request) 1415 throws ServiceException { 1416 return stub.setBalancerRunning(controller, request); 1417 } 1418 1419 @Override 1420 public NormalizeResponse normalize(RpcController controller, 1421 NormalizeRequest request) throws ServiceException { 1422 return stub.normalize(controller, request); 1423 } 1424 1425 @Override 1426 public SetNormalizerRunningResponse setNormalizerRunning( 1427 RpcController controller, SetNormalizerRunningRequest request) 1428 throws ServiceException { 1429 return stub.setNormalizerRunning(controller, request); 1430 } 1431 1432 @Override 1433 public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller, 1434 MasterProtos.RunCatalogScanRequest request) throws ServiceException { 1435 return stub.runCatalogScan(controller, request); 1436 } 1437 1438 @Override 1439 public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor( 1440 RpcController controller, MasterProtos.EnableCatalogJanitorRequest request) 1441 throws ServiceException { 1442 return stub.enableCatalogJanitor(controller, request); 1443 } 1444 1445 @Override 1446 public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled( 1447 RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request) 1448 throws ServiceException { 1449 return stub.isCatalogJanitorEnabled(controller, request); 1450 } 1451 1452 @Override 1453 public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller, 1454 MasterProtos.RunCleanerChoreRequest request) 1455 throws ServiceException { 1456 return stub.runCleanerChore(controller, request); 1457 } 1458 1459 @Override 1460 public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning( 1461 RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request) 1462 throws ServiceException { 1463 return stub.setCleanerChoreRunning(controller, request); 1464 } 1465 1466 @Override 1467 public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled( 1468 RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request) 1469 throws ServiceException { 1470 return stub.isCleanerChoreEnabled(controller, request); 1471 } 1472 1473 @Override 1474 public ClientProtos.CoprocessorServiceResponse execMasterService( 1475 RpcController controller, ClientProtos.CoprocessorServiceRequest request) 1476 throws ServiceException { 1477 return stub.execMasterService(controller, request); 1478 } 1479 1480 @Override 1481 public MasterProtos.SnapshotResponse snapshot(RpcController controller, 1482 MasterProtos.SnapshotRequest request) throws ServiceException { 1483 return stub.snapshot(controller, request); 1484 } 1485 1486 @Override 1487 public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots( 1488 RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request) 1489 throws ServiceException { 1490 return stub.getCompletedSnapshots(controller, request); 1491 } 1492 1493 @Override 1494 public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller, 1495 MasterProtos.DeleteSnapshotRequest request) throws ServiceException { 1496 return stub.deleteSnapshot(controller, request); 1497 } 1498 1499 @Override 1500 public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller, 1501 MasterProtos.IsSnapshotDoneRequest request) throws ServiceException { 1502 return stub.isSnapshotDone(controller, request); 1503 } 1504 1505 @Override 1506 public MasterProtos.RestoreSnapshotResponse restoreSnapshot( 1507 RpcController controller, MasterProtos.RestoreSnapshotRequest request) 1508 throws ServiceException { 1509 return stub.restoreSnapshot(controller, request); 1510 } 1511 1512 @Override 1513 public MasterProtos.ExecProcedureResponse execProcedure( 1514 RpcController controller, MasterProtos.ExecProcedureRequest request) 1515 throws ServiceException { 1516 return stub.execProcedure(controller, request); 1517 } 1518 1519 @Override 1520 public MasterProtos.ExecProcedureResponse execProcedureWithRet( 1521 RpcController controller, MasterProtos.ExecProcedureRequest request) 1522 throws ServiceException { 1523 return stub.execProcedureWithRet(controller, request); 1524 } 1525 1526 @Override 1527 public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller, 1528 MasterProtos.IsProcedureDoneRequest request) throws ServiceException { 1529 return stub.isProcedureDone(controller, request); 1530 } 1531 1532 @Override 1533 public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller, 1534 MasterProtos.GetProcedureResultRequest request) throws ServiceException { 1535 return stub.getProcedureResult(controller, request); 1536 } 1537 1538 @Override 1539 public MasterProtos.IsMasterRunningResponse isMasterRunning( 1540 RpcController controller, MasterProtos.IsMasterRunningRequest request) 1541 throws ServiceException { 1542 return stub.isMasterRunning(controller, request); 1543 } 1544 1545 @Override 1546 public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller, 1547 MasterProtos.ModifyNamespaceRequest request) 1548 throws ServiceException { 1549 return stub.modifyNamespace(controller, request); 1550 } 1551 1552 @Override 1553 public MasterProtos.CreateNamespaceResponse createNamespace( 1554 RpcController controller, 1555 MasterProtos.CreateNamespaceRequest request) throws ServiceException { 1556 return stub.createNamespace(controller, request); 1557 } 1558 1559 @Override 1560 public MasterProtos.DeleteNamespaceResponse deleteNamespace( 1561 RpcController controller, 1562 MasterProtos.DeleteNamespaceRequest request) throws ServiceException { 1563 return stub.deleteNamespace(controller, request); 1564 } 1565 1566 @Override 1567 public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor( 1568 RpcController controller, 1569 MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException { 1570 return stub.getNamespaceDescriptor(controller, request); 1571 } 1572 1573 @Override 1574 public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors( 1575 RpcController controller, 1576 MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException { 1577 return stub.listNamespaceDescriptors(controller, request); 1578 } 1579 1580 @Override 1581 public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace( 1582 RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request) 1583 throws ServiceException { 1584 return stub.listTableDescriptorsByNamespace(controller, request); 1585 } 1586 1587 @Override 1588 public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace( 1589 RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request) 1590 throws ServiceException { 1591 return stub.listTableNamesByNamespace(controller, request); 1592 } 1593 1594 @Override 1595 public MasterProtos.GetTableStateResponse getTableState( 1596 RpcController controller, MasterProtos.GetTableStateRequest request) 1597 throws ServiceException { 1598 return stub.getTableState(controller, request); 1599 } 1600 1601 @Override 1602 public void close() { 1603 release(this.mss); 1604 } 1605 1606 @Override 1607 public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus( 1608 RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request) 1609 throws ServiceException { 1610 return stub.getSchemaAlterStatus(controller, request); 1611 } 1612 1613 @Override 1614 public MasterProtos.GetTableDescriptorsResponse getTableDescriptors( 1615 RpcController controller, MasterProtos.GetTableDescriptorsRequest request) 1616 throws ServiceException { 1617 return stub.getTableDescriptors(controller, request); 1618 } 1619 1620 @Override 1621 public MasterProtos.GetTableNamesResponse getTableNames( 1622 RpcController controller, MasterProtos.GetTableNamesRequest request) 1623 throws ServiceException { 1624 return stub.getTableNames(controller, request); 1625 } 1626 1627 @Override 1628 public MasterProtos.GetClusterStatusResponse getClusterStatus( 1629 RpcController controller, MasterProtos.GetClusterStatusRequest request) 1630 throws ServiceException { 1631 return stub.getClusterStatus(controller, request); 1632 } 1633 1634 @Override 1635 public MasterProtos.SetQuotaResponse setQuota( 1636 RpcController controller, MasterProtos.SetQuotaRequest request) 1637 throws ServiceException { 1638 return stub.setQuota(controller, request); 1639 } 1640 1641 @Override 1642 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp( 1643 RpcController controller, MasterProtos.MajorCompactionTimestampRequest request) 1644 throws ServiceException { 1645 return stub.getLastMajorCompactionTimestamp(controller, request); 1646 } 1647 1648 @Override 1649 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion( 1650 RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request) 1651 throws ServiceException { 1652 return stub.getLastMajorCompactionTimestampForRegion(controller, request); 1653 } 1654 1655 @Override 1656 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, 1657 IsBalancerEnabledRequest request) throws ServiceException { 1658 return stub.isBalancerEnabled(controller, request); 1659 } 1660 1661 @Override 1662 public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled( 1663 RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request) 1664 throws ServiceException { 1665 return stub.setSplitOrMergeEnabled(controller, request); 1666 } 1667 1668 @Override 1669 public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled( 1670 RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request) 1671 throws ServiceException { 1672 return stub.isSplitOrMergeEnabled(controller, request); 1673 } 1674 1675 @Override 1676 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller, 1677 IsNormalizerEnabledRequest request) throws ServiceException { 1678 return stub.isNormalizerEnabled(controller, request); 1679 } 1680 1681 @Override 1682 public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller, 1683 SecurityCapabilitiesRequest request) throws ServiceException { 1684 return stub.getSecurityCapabilities(controller, request); 1685 } 1686 1687 @Override 1688 public AddReplicationPeerResponse addReplicationPeer(RpcController controller, 1689 AddReplicationPeerRequest request) throws ServiceException { 1690 return stub.addReplicationPeer(controller, request); 1691 } 1692 1693 @Override 1694 public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller, 1695 RemoveReplicationPeerRequest request) throws ServiceException { 1696 return stub.removeReplicationPeer(controller, request); 1697 } 1698 1699 @Override 1700 public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller, 1701 EnableReplicationPeerRequest request) throws ServiceException { 1702 return stub.enableReplicationPeer(controller, request); 1703 } 1704 1705 @Override 1706 public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller, 1707 DisableReplicationPeerRequest request) throws ServiceException { 1708 return stub.disableReplicationPeer(controller, request); 1709 } 1710 1711 @Override 1712 public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller, 1713 ListDecommissionedRegionServersRequest request) throws ServiceException { 1714 return stub.listDecommissionedRegionServers(controller, request); 1715 } 1716 1717 @Override 1718 public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller, 1719 DecommissionRegionServersRequest request) throws ServiceException { 1720 return stub.decommissionRegionServers(controller, request); 1721 } 1722 1723 @Override 1724 public RecommissionRegionServerResponse recommissionRegionServer( 1725 RpcController controller, RecommissionRegionServerRequest request) 1726 throws ServiceException { 1727 return stub.recommissionRegionServer(controller, request); 1728 } 1729 1730 @Override 1731 public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller, 1732 GetReplicationPeerConfigRequest request) throws ServiceException { 1733 return stub.getReplicationPeerConfig(controller, request); 1734 } 1735 1736 @Override 1737 public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig( 1738 RpcController controller, UpdateReplicationPeerConfigRequest request) 1739 throws ServiceException { 1740 return stub.updateReplicationPeerConfig(controller, request); 1741 } 1742 1743 @Override 1744 public ListReplicationPeersResponse listReplicationPeers(RpcController controller, 1745 ListReplicationPeersRequest request) throws ServiceException { 1746 return stub.listReplicationPeers(controller, request); 1747 } 1748 1749 @Override 1750 public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes( 1751 RpcController controller, GetSpaceQuotaRegionSizesRequest request) 1752 throws ServiceException { 1753 return stub.getSpaceQuotaRegionSizes(controller, request); 1754 } 1755 1756 @Override 1757 public GetQuotaStatesResponse getQuotaStates( 1758 RpcController controller, GetQuotaStatesRequest request) throws ServiceException { 1759 return stub.getQuotaStates(controller, request); 1760 } 1761 1762 @Override 1763 public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller, 1764 MasterProtos.ClearDeadServersRequest request) throws ServiceException { 1765 return stub.clearDeadServers(controller, request); 1766 } 1767 1768 @Override 1769 public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, 1770 SwitchRpcThrottleRequest request) throws ServiceException { 1771 return stub.switchRpcThrottle(controller, request); 1772 } 1773 1774 @Override 1775 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller, 1776 IsRpcThrottleEnabledRequest request) throws ServiceException { 1777 return stub.isRpcThrottleEnabled(controller, request); 1778 } 1779 1780 @Override 1781 public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller, 1782 SwitchExceedThrottleQuotaRequest request) throws ServiceException { 1783 return stub.switchExceedThrottleQuota(controller, request); 1784 } 1785 1786 @Override 1787 public AccessControlProtos.GrantResponse grant(RpcController controller, 1788 AccessControlProtos.GrantRequest request) throws ServiceException { 1789 return stub.grant(controller, request); 1790 } 1791 1792 @Override 1793 public AccessControlProtos.RevokeResponse revoke(RpcController controller, 1794 AccessControlProtos.RevokeRequest request) throws ServiceException { 1795 return stub.revoke(controller, request); 1796 } 1797 1798 @Override 1799 public GetUserPermissionsResponse getUserPermissions(RpcController controller, 1800 GetUserPermissionsRequest request) throws ServiceException { 1801 return stub.getUserPermissions(controller, request); 1802 } 1803 1804 @Override 1805 public HasUserPermissionsResponse hasUserPermissions(RpcController controller, 1806 HasUserPermissionsRequest request) throws ServiceException { 1807 return stub.hasUserPermissions(controller, request); 1808 } 1809 }; 1810 } 1811 1812 private static void release(MasterServiceState mss) { 1813 if (mss != null && mss.connection != null) { 1814 ((ConnectionImplementation)mss.connection).releaseMaster(mss); 1815 } 1816 } 1817 1818 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) { 1819 if (mss.getStub() == null){ 1820 return false; 1821 } 1822 try { 1823 return mss.isMasterRunning(); 1824 } catch (UndeclaredThrowableException e) { 1825 // It's somehow messy, but we can receive exceptions such as 1826 // java.net.ConnectException but they're not declared. So we catch it... 1827 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); 1828 return false; 1829 } catch (IOException se) { 1830 LOG.warn("Checking master connection", se); 1831 return false; 1832 } 1833 } 1834 1835 void releaseMaster(MasterServiceState mss) { 1836 if (mss.getStub() == null) { 1837 return; 1838 } 1839 synchronized (masterLock) { 1840 --mss.userCount; 1841 } 1842 } 1843 1844 private void closeMasterService(MasterServiceState mss) { 1845 if (mss.getStub() != null) { 1846 LOG.info("Closing master protocol: " + mss); 1847 mss.clearStub(); 1848 } 1849 mss.userCount = 0; 1850 } 1851 1852 /** 1853 * Immediate close of the shared master. Can be by the delayed close or when closing the 1854 * connection itself. 1855 */ 1856 private void closeMaster() { 1857 synchronized (masterLock) { 1858 closeMasterService(masterServiceState); 1859 } 1860 } 1861 1862 void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) { 1863 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum); 1864 cacheLocation(hri.getTable(), source, newHrl); 1865 } 1866 1867 @Override 1868 public void deleteCachedRegionLocation(final HRegionLocation location) { 1869 metaCache.clearCache(location); 1870 } 1871 1872 /** 1873 * Update the location with the new value (if the exception is a RegionMovedException) 1874 * or delete it from the cache. Does nothing if we can be sure from the exception that 1875 * the location is still accurate, or if the cache has already been updated. 1876 * @param exception an object (to simplify user code) on which we will try to find a nested 1877 * or wrapped or both RegionMovedException 1878 * @param source server that is the source of the location update. 1879 */ 1880 @Override 1881 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey, 1882 final Object exception, final ServerName source) { 1883 if (rowkey == null || tableName == null) { 1884 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) + 1885 ", tableName=" + (tableName == null ? "null" : tableName)); 1886 return; 1887 } 1888 1889 if (source == null) { 1890 // This should not happen, but let's secure ourselves. 1891 return; 1892 } 1893 1894 if (regionName == null) { 1895 // we do not know which region, so just remove the cache entry for the row and server 1896 if (metrics != null) { 1897 metrics.incrCacheDroppingExceptions(exception); 1898 } 1899 metaCache.clearCache(tableName, rowkey, source); 1900 return; 1901 } 1902 1903 // Is it something we have already updated? 1904 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey); 1905 HRegionLocation oldLocation = null; 1906 if (oldLocations != null) { 1907 oldLocation = oldLocations.getRegionLocationByRegionName(regionName); 1908 } 1909 if (oldLocation == null || !source.equals(oldLocation.getServerName())) { 1910 // There is no such location in the cache (it's been removed already) or 1911 // the cache has already been refreshed with a different location. => nothing to do 1912 return; 1913 } 1914 1915 RegionInfo regionInfo = oldLocation.getRegion(); 1916 Throwable cause = ClientExceptionsUtil.findException(exception); 1917 if (cause != null) { 1918 if (!ClientExceptionsUtil.isMetaClearingException(cause)) { 1919 // We know that the region is still on this region server 1920 return; 1921 } 1922 1923 if (cause instanceof RegionMovedException) { 1924 RegionMovedException rme = (RegionMovedException) cause; 1925 if (LOG.isTraceEnabled()) { 1926 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " + 1927 rme.getHostname() + ":" + rme.getPort() + 1928 " according to " + source.getAddress()); 1929 } 1930 // We know that the region is not anymore on this region server, but we know 1931 // the new location. 1932 updateCachedLocation( 1933 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum()); 1934 return; 1935 } 1936 } 1937 1938 if (metrics != null) { 1939 metrics.incrCacheDroppingExceptions(exception); 1940 } 1941 1942 // If we're here, it means that can cannot be sure about the location, so we remove it from 1943 // the cache. Do not send the source because source can be a new server in the same host:port 1944 metaCache.clearCache(regionInfo); 1945 } 1946 1947 @Override 1948 public AsyncProcess getAsyncProcess() { 1949 return asyncProcess; 1950 } 1951 1952 @Override 1953 public ServerStatisticTracker getStatisticsTracker() { 1954 return this.stats; 1955 } 1956 1957 @Override 1958 public ClientBackoffPolicy getBackoffPolicy() { 1959 return this.backoffPolicy; 1960 } 1961 1962 /* 1963 * Return the number of cached region for a table. It will only be called 1964 * from a unit test. 1965 */ 1966 @VisibleForTesting 1967 int getNumberOfCachedRegionLocations(final TableName tableName) { 1968 return metaCache.getNumberOfCachedRegionLocations(tableName); 1969 } 1970 1971 @Override 1972 public void abort(final String msg, Throwable t) { 1973 if (t != null) { 1974 LOG.error(HBaseMarkers.FATAL, msg, t); 1975 } else { 1976 LOG.error(HBaseMarkers.FATAL, msg); 1977 } 1978 this.aborted = true; 1979 close(); 1980 this.closed = true; 1981 } 1982 1983 @Override 1984 public boolean isClosed() { 1985 return this.closed; 1986 } 1987 1988 @Override 1989 public boolean isAborted(){ 1990 return this.aborted; 1991 } 1992 1993 @Override 1994 public void close() { 1995 if (this.closed) { 1996 return; 1997 } 1998 closeMaster(); 1999 shutdownPools(); 2000 if (this.metrics != null) { 2001 this.metrics.shutdown(); 2002 } 2003 this.closed = true; 2004 registry.close(); 2005 this.stubs.clear(); 2006 if (clusterStatusListener != null) { 2007 clusterStatusListener.close(); 2008 } 2009 if (rpcClient != null) { 2010 rpcClient.close(); 2011 } 2012 if (authService != null) { 2013 authService.shutdown(); 2014 } 2015 } 2016 2017 /** 2018 * Close the connection for good. On the off chance that someone is unable to close 2019 * the connection, perhaps because it bailed out prematurely, the method 2020 * below will ensure that this instance is cleaned up. 2021 * Caveat: The JVM may take an unknown amount of time to call finalize on an 2022 * unreachable object, so our hope is that every consumer cleans up after 2023 * itself, like any good citizen. 2024 */ 2025 @Override 2026 protected void finalize() throws Throwable { 2027 super.finalize(); 2028 close(); 2029 } 2030 2031 @Override 2032 public NonceGenerator getNonceGenerator() { 2033 return nonceGenerator; 2034 } 2035 2036 @Override 2037 public TableState getTableState(TableName tableName) throws IOException { 2038 checkClosed(); 2039 TableState tableState = MetaTableAccessor.getTableState(this, tableName); 2040 if (tableState == null) { 2041 throw new TableNotFoundException(tableName); 2042 } 2043 return tableState; 2044 } 2045 2046 @Override 2047 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { 2048 return RpcRetryingCallerFactory 2049 .instantiate(conf, this.interceptor, this.getStatisticsTracker()); 2050 } 2051 2052 @Override 2053 public boolean hasCellBlockSupport() { 2054 return this.rpcClient.hasCellBlockSupport(); 2055 } 2056 2057 @Override 2058 public ConnectionConfiguration getConnectionConfiguration() { 2059 return this.connectionConfig; 2060 } 2061 2062 @Override 2063 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { 2064 return this.rpcCallerFactory; 2065 } 2066 2067 @Override 2068 public RpcControllerFactory getRpcControllerFactory() { 2069 return this.rpcControllerFactory; 2070 } 2071 2072 private static <T> T get(CompletableFuture<T> future) throws IOException { 2073 try { 2074 return future.get(); 2075 } catch (InterruptedException e) { 2076 Thread.currentThread().interrupt(); 2077 throw (IOException) new InterruptedIOException().initCause(e); 2078 } catch (ExecutionException e) { 2079 Throwable cause = e.getCause(); 2080 Throwables.propagateIfPossible(cause, IOException.class); 2081 throw new IOException(cause); 2082 } 2083 } 2084}