001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS; 021import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 022import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 023import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 027import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 028import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 029import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx; 030 031import edu.umd.cs.findbugs.annotations.Nullable; 032import io.opentelemetry.api.trace.Span; 033import io.opentelemetry.context.Scope; 034import java.io.Closeable; 035import java.io.IOException; 036import java.io.InterruptedIOException; 037import java.lang.reflect.UndeclaredThrowableException; 038import java.util.ArrayList; 039import java.util.Collections; 040import java.util.Date; 041import java.util.List; 042import java.util.concurrent.BlockingQueue; 043import java.util.concurrent.CompletableFuture; 044import java.util.concurrent.ConcurrentHashMap; 045import java.util.concurrent.ConcurrentMap; 046import java.util.concurrent.ExecutionException; 047import java.util.concurrent.ExecutorService; 048import java.util.concurrent.LinkedBlockingQueue; 049import java.util.concurrent.ThreadPoolExecutor; 050import java.util.concurrent.TimeUnit; 051import java.util.concurrent.atomic.AtomicInteger; 052import java.util.concurrent.locks.ReentrantLock; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.hbase.AuthUtil; 055import org.apache.hadoop.hbase.CatalogReplicaMode; 056import org.apache.hadoop.hbase.ChoreService; 057import org.apache.hadoop.hbase.DoNotRetryIOException; 058import org.apache.hadoop.hbase.HBaseServerException; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.HRegionLocation; 061import org.apache.hadoop.hbase.MasterNotRunningException; 062import org.apache.hadoop.hbase.MetaTableAccessor; 063import org.apache.hadoop.hbase.RegionLocations; 064import org.apache.hadoop.hbase.ServerName; 065import org.apache.hadoop.hbase.TableName; 066import org.apache.hadoop.hbase.TableNotEnabledException; 067import org.apache.hadoop.hbase.TableNotFoundException; 068import org.apache.hadoop.hbase.ZooKeeperConnectionException; 069import org.apache.hadoop.hbase.client.Scan.ReadType; 070import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 071import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 072import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; 073import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 074import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; 075import org.apache.hadoop.hbase.exceptions.RegionMovedException; 076import org.apache.hadoop.hbase.ipc.RpcClient; 077import org.apache.hadoop.hbase.ipc.RpcClientFactory; 078import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 079import org.apache.hadoop.hbase.log.HBaseMarkers; 080import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 081import org.apache.hadoop.hbase.security.User; 082import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; 083import org.apache.hadoop.hbase.trace.TraceUtil; 084import org.apache.hadoop.hbase.util.Bytes; 085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 086import org.apache.hadoop.hbase.util.ExceptionUtil; 087import org.apache.hadoop.hbase.util.Pair; 088import org.apache.hadoop.hbase.util.ReflectionUtils; 089import org.apache.hadoop.hbase.util.Threads; 090import org.apache.hadoop.ipc.RemoteException; 091import org.apache.hadoop.security.UserGroupInformation; 092import org.apache.yetus.audience.InterfaceAudience; 093import org.apache.zookeeper.KeeperException; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 098import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 099import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 100import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 101import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 102 103import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 104import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreRequest; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FlushMasterStoreResponse; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerRequest; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnStoreFileTrackerResponse; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerRequest; 130import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableStoreFileTrackerResponse; 131import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 132import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 133import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 134import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 135import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 136import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 137import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 138import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 139import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 140import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 141import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 142import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 143import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 144import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 145import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 146import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 147import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 148import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 149import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 150import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 151import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 152import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 153import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 154import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 155import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 156import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 157import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 158import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 159import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 160import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 161 162/** 163 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. Encapsulates 164 * connection to zookeeper and regionservers. 165 */ 166@edu.umd.cs.findbugs.annotations.SuppressWarnings( 167 value = "AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", 168 justification = "Access to the conncurrent hash map is under a lock so should be fine.") 169@InterfaceAudience.Private 170public class ConnectionImplementation implements ClusterConnection, Closeable { 171 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; 172 private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); 173 174 // The mode tells if HedgedRead, LoadBalance mode is supported. 175 // The default mode is CatalogReplicaMode.None. 176 private CatalogReplicaMode metaReplicaMode; 177 private CatalogReplicaLoadBalanceSelector metaReplicaSelector; 178 179 private final int metaReplicaCallTimeoutScanInMicroSecond; 180 private final int numTries; 181 final int rpcTimeout; 182 183 /** 184 * Global nonceGenerator shared per client. Currently there's no reason to limit its scope. Once 185 * it's set under nonceGeneratorCreateLock, it is never unset or changed. 186 */ 187 // XXX: It is a bad pattern to assign a value to a static field from a constructor. However 188 // it would likely change semantics if we change it because the NonceGenerator is selected 189 // from configuration passed in as a parameter of the constructor. This has been cleaned up 190 // in later branches. 191 private static volatile NonceGenerator nonceGenerator = null; 192 /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */ 193 private static final Object nonceGeneratorCreateLock = new Object(); 194 195 private final AsyncProcess asyncProcess; 196 // single tracker per connection 197 private final ServerStatisticTracker stats; 198 199 private volatile boolean closed; 200 private volatile boolean aborted; 201 202 // package protected for the tests 203 ClusterStatusListener clusterStatusListener; 204 205 private final Object metaRegionLock = new Object(); 206 207 private final Object masterLock = new Object(); 208 209 // thread executor shared by all Table instances created 210 // by this connection 211 private volatile ThreadPoolExecutor batchPool = null; 212 // meta thread executor shared by all Table instances created 213 // by this connection 214 private volatile ThreadPoolExecutor metaLookupPool = null; 215 private volatile boolean cleanupPool = false; 216 217 private final Configuration conf; 218 219 // cache the configuration value for tables so that we can avoid calling 220 // the expensive Configuration to fetch the value multiple times. 221 private final ConnectionConfiguration connectionConfig; 222 223 // Client rpc instance. 224 private final RpcClient rpcClient; 225 226 private final MetaCache metaCache; 227 private final MetricsConnection metrics; 228 229 protected User user; 230 231 private final RpcRetryingCallerFactory rpcCallerFactory; 232 233 private final RpcControllerFactory rpcControllerFactory; 234 235 private final RetryingCallerInterceptor interceptor; 236 237 /** 238 * Cluster registry of basic info such as clusterid and meta region location. 239 */ 240 private final ConnectionRegistry registry; 241 242 private final ClientBackoffPolicy backoffPolicy; 243 244 /** 245 * Allow setting an alternate BufferedMutator implementation via config. If null, use default. 246 */ 247 private final String alternateBufferedMutatorClassName; 248 249 /** lock guards against multiple threads trying to query the meta region at the same time */ 250 private final ReentrantLock userRegionLock = new ReentrantLock(); 251 252 private ChoreService choreService; 253 254 /** 255 * constructor 256 * @param conf Configuration object 257 */ 258 ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { 259 this(conf, pool, user, null); 260 } 261 262 /** 263 * Constructor, for creating cluster connection with provided ConnectionRegistry. 264 */ 265 ConnectionImplementation(Configuration conf, ExecutorService pool, User user, 266 ConnectionRegistry registry) throws IOException { 267 this.conf = conf; 268 this.user = user; 269 if (user != null && user.isLoginFromKeytab()) { 270 spawnRenewalChore(user.getUGI()); 271 } 272 this.batchPool = (ThreadPoolExecutor) pool; 273 this.connectionConfig = new ConnectionConfiguration(conf); 274 this.closed = false; 275 this.metaReplicaCallTimeoutScanInMicroSecond = 276 connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan(); 277 278 // how many times to try, one more than max *retry* time 279 this.numTries = retries2Attempts(connectionConfig.getRetriesNumber()); 280 this.rpcTimeout = 281 conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 282 if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) { 283 synchronized (nonceGeneratorCreateLock) { 284 if (nonceGenerator == null) { 285 nonceGenerator = PerClientRandomNonceGenerator.get(); 286 } 287 } 288 } else { 289 nonceGenerator = NO_NONCE_GENERATOR; 290 } 291 292 this.stats = ServerStatisticTracker.create(conf); 293 this.interceptor = new RetryingCallerInterceptorFactory(conf).build(); 294 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 295 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); 296 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 297 this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); 298 299 boolean shouldListen = 300 conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); 301 Class<? extends ClusterStatusListener.Listener> listenerClass = 302 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, 303 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); 304 305 // Is there an alternate BufferedMutator to use? 306 this.alternateBufferedMutatorClassName = this.conf.get(BufferedMutator.CLASSNAME_KEY); 307 308 try { 309 if (registry == null) { 310 this.registry = ConnectionRegistryFactory.getRegistry(conf); 311 } else { 312 this.registry = registry; 313 } 314 retrieveClusterId(); 315 316 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 317 String scope = MetricsConnection.getScope(conf, clusterId, this); 318 this.metrics = new MetricsConnection(scope, this::getBatchPool, this::getMetaLookupPool); 319 } else { 320 this.metrics = null; 321 } 322 this.metaCache = new MetaCache(this.metrics); 323 324 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); 325 326 // Do we publish the status? 327 if (shouldListen) { 328 if (listenerClass == null) { 329 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " 330 + ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); 331 } else { 332 clusterStatusListener = 333 new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() { 334 @Override 335 public void newDead(ServerName sn) { 336 clearCaches(sn); 337 rpcClient.cancelConnections(sn); 338 } 339 }, conf, listenerClass); 340 } 341 } 342 } catch (Throwable e) { 343 // avoid leaks: registry, rpcClient, ... 344 LOG.debug("connection construction failed", e); 345 close(); 346 throw e; 347 } 348 349 // Get the region locator's meta replica mode. 350 this.metaReplicaMode = CatalogReplicaMode 351 .fromString(conf.get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString())); 352 353 switch (this.metaReplicaMode) { 354 case LOAD_BALANCE: 355 String replicaSelectorClass = 356 conf.get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR, 357 CatalogReplicaLoadBalanceSimpleSelector.class.getName()); 358 359 this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory 360 .createSelector(replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> { 361 int numOfReplicas = 1; 362 try { 363 RegionLocations metaLocations = this.registry.getMetaRegionLocations() 364 .get(connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS); 365 numOfReplicas = metaLocations.size(); 366 } catch (Exception e) { 367 LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e); 368 } 369 return numOfReplicas; 370 }); 371 break; 372 case NONE: 373 // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config. 374 375 boolean useMetaReplicas = conf.getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS); 376 if (useMetaReplicas) { 377 this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ; 378 } 379 break; 380 default: 381 // Doing nothing 382 } 383 } 384 385 private void spawnRenewalChore(final UserGroupInformation user) { 386 ChoreService service = getChoreService(); 387 service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf)); 388 } 389 390 /** 391 * @param conn The connection for which to replace the generator. 392 * @param cnm Replaces the nonce generator used, for testing. 393 * @return old nonce generator. 394 */ 395 static NonceGenerator injectNonceGeneratorForTesting(ClusterConnection conn, NonceGenerator cnm) { 396 ConnectionImplementation connImpl = (ConnectionImplementation) conn; 397 NonceGenerator ng = connImpl.getNonceGenerator(); 398 LOG.warn("Nonce generator is being replaced by test code for " + cnm.getClass().getName()); 399 nonceGenerator = cnm; 400 return ng; 401 } 402 403 @Override 404 public Table getTable(TableName tableName) throws IOException { 405 return getTable(tableName, getBatchPool()); 406 } 407 408 @Override 409 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 410 return new TableBuilderBase(tableName, connectionConfig) { 411 412 @Override 413 public Table build() { 414 return new HTable(ConnectionImplementation.this, this, rpcCallerFactory, 415 rpcControllerFactory, pool); 416 } 417 }; 418 } 419 420 @Override 421 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { 422 if (params.getTableName() == null) { 423 throw new IllegalArgumentException("TableName cannot be null."); 424 } 425 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { 426 params.writeBufferSize(connectionConfig.getWriteBufferSize()); 427 } 428 if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) { 429 params.setWriteBufferPeriodicFlushTimeoutMs( 430 connectionConfig.getWriteBufferPeriodicFlushTimeoutMs()); 431 } 432 if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) { 433 params.setWriteBufferPeriodicFlushTimerTickMs( 434 connectionConfig.getWriteBufferPeriodicFlushTimerTickMs()); 435 } 436 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { 437 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); 438 } 439 // Look to see if an alternate BufferedMutation implementation is wanted. 440 // Look in params and in config. If null, use default. 441 String implementationClassName = params.getImplementationClassName(); 442 if (implementationClassName == null) { 443 implementationClassName = this.alternateBufferedMutatorClassName; 444 } 445 if (implementationClassName == null) { 446 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); 447 } 448 try { 449 return (BufferedMutator) ReflectionUtils.newInstance(Class.forName(implementationClassName), 450 this, rpcCallerFactory, rpcControllerFactory, params); 451 } catch (ClassNotFoundException e) { 452 throw new RuntimeException(e); 453 } 454 } 455 456 @Override 457 public BufferedMutator getBufferedMutator(TableName tableName) { 458 return getBufferedMutator(new BufferedMutatorParams(tableName)); 459 } 460 461 @Override 462 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 463 return new HRegionLocator(tableName, this); 464 } 465 466 @Override 467 public Admin getAdmin() throws IOException { 468 return new HBaseAdmin(this); 469 } 470 471 @Override 472 public Hbck getHbck() throws IOException { 473 return TraceUtil.trace(() -> getHbck(get(registry.getActiveMaster())), 474 () -> TraceUtil.createSpan(this.getClass().getSimpleName() + ".getHbck")); 475 } 476 477 @Override 478 public Hbck getHbck(ServerName masterServer) throws IOException { 479 return TraceUtil.trace(() -> { 480 checkClosed(); 481 if (isDeadServer(masterServer)) { 482 throw new RegionServerStoppedException(masterServer + " is dead."); 483 } 484 String key = 485 getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), masterServer); 486 487 return new HBaseHbck( 488 (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 489 BlockingRpcChannel channel = 490 this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); 491 return MasterProtos.HbckService.newBlockingStub(channel); 492 }), rpcControllerFactory); 493 }, () -> TraceUtil.createSpan(this.getClass().getSimpleName() + ".getHbck") 494 .setAttribute(HBaseSemanticAttributes.SERVER_NAME_KEY, masterServer.getServerName())); 495 } 496 497 @Override 498 public MetricsConnection getConnectionMetrics() { 499 return this.metrics; 500 } 501 502 @Override 503 public User getUser() { 504 return user; 505 } 506 507 @Override 508 public ConnectionRegistry getConnectionRegistry() { 509 return registry; 510 } 511 512 private ThreadPoolExecutor getBatchPool() { 513 if (batchPool == null) { 514 synchronized (this) { 515 if (batchPool == null) { 516 int threads = conf.getInt("hbase.hconnection.threads.max", 256); 517 this.batchPool = getThreadPool(threads, threads, "-shared", null); 518 this.cleanupPool = true; 519 } 520 } 521 } 522 return this.batchPool; 523 } 524 525 private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint, 526 BlockingQueue<Runnable> passedWorkQueue) { 527 // shared HTable thread executor not yet initialized 528 if (maxThreads == 0) { 529 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 530 } 531 if (coreThreads == 0) { 532 coreThreads = Runtime.getRuntime().availableProcessors() * 8; 533 } 534 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); 535 BlockingQueue<Runnable> workQueue = passedWorkQueue; 536 if (workQueue == null) { 537 workQueue = 538 new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 539 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 540 coreThreads = maxThreads; 541 } 542 ThreadPoolExecutor tpe = 543 new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, 544 new ThreadFactoryBuilder().setNameFormat(toString() + nameHint + "-pool-%d") 545 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 546 tpe.allowCoreThreadTimeOut(true); 547 return tpe; 548 } 549 550 private ThreadPoolExecutor getMetaLookupPool() { 551 if (this.metaLookupPool == null) { 552 synchronized (this) { 553 if (this.metaLookupPool == null) { 554 // Some of the threads would be used for meta replicas 555 // To start with, threads.max.core threads can hit the meta (including replicas). 556 // After that, requests will get queued up in the passed queue, and only after 557 // the queue is full, a new thread will be started 558 int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128); 559 this.metaLookupPool = 560 getThreadPool(threads, threads, "-metaLookup-shared-", new LinkedBlockingQueue<>()); 561 } 562 } 563 } 564 return this.metaLookupPool; 565 } 566 567 protected ExecutorService getCurrentMetaLookupPool() { 568 return metaLookupPool; 569 } 570 571 protected ExecutorService getCurrentBatchPool() { 572 return batchPool; 573 } 574 575 private void shutdownPools() { 576 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { 577 shutdownBatchPool(this.batchPool); 578 } 579 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { 580 shutdownBatchPool(this.metaLookupPool); 581 } 582 } 583 584 private void shutdownBatchPool(ExecutorService pool) { 585 pool.shutdown(); 586 try { 587 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 588 pool.shutdownNow(); 589 } 590 } catch (InterruptedException e) { 591 pool.shutdownNow(); 592 } 593 } 594 595 /** 596 * For tests only. 597 */ 598 RpcClient getRpcClient() { 599 return rpcClient; 600 } 601 602 /** 603 * An identifier that will remain the same for a given connection. 604 */ 605 @Override 606 public String toString() { 607 return "hconnection-0x" + Integer.toHexString(hashCode()); 608 } 609 610 protected String clusterId = null; 611 612 protected void retrieveClusterId() { 613 if (clusterId != null) { 614 return; 615 } 616 try { 617 this.clusterId = this.registry.getClusterId().get(); 618 } catch (InterruptedException | ExecutionException e) { 619 LOG.warn("Retrieve cluster id failed", e); 620 } 621 if (clusterId == null) { 622 clusterId = HConstants.CLUSTER_ID_DEFAULT; 623 LOG.debug("clusterid came back null, using default " + clusterId); 624 } 625 } 626 627 /** 628 * If choreService has not been created yet, create the ChoreService. n 629 */ 630 synchronized ChoreService getChoreService() { 631 if (choreService == null) { 632 choreService = new ChoreService("AsyncConn Chore Service"); 633 } 634 return choreService; 635 } 636 637 @Override 638 public Configuration getConfiguration() { 639 return this.conf; 640 } 641 642 private void checkClosed() throws LocalConnectionClosedException { 643 if (this.closed) { 644 throw new LocalConnectionClosedException(toString() + " closed"); 645 } 646 } 647 648 /** 649 * Like {@link ConnectionClosedException} but thrown from the checkClosed call which looks at the 650 * local this.closed flag. We use this rather than {@link ConnectionClosedException} because the 651 * latter does not inherit from DoNotRetryIOE (it should. TODO). 652 */ 653 private static class LocalConnectionClosedException extends DoNotRetryIOException { 654 LocalConnectionClosedException(String message) { 655 super(message); 656 } 657 } 658 659 /** 660 * @return true if the master is running, throws an exception otherwise 661 * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running 662 * @deprecated this has been deprecated without a replacement 663 */ 664 @Deprecated 665 @Override 666 public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { 667 // When getting the master connection, we check it's running, 668 // so if there is no exception, it means we've been able to get a 669 // connection on a running master 670 MasterKeepAliveConnection m; 671 try { 672 m = getKeepAliveMasterService(); 673 } catch (IOException e) { 674 throw new MasterNotRunningException(e); 675 } 676 m.close(); 677 return true; 678 } 679 680 @Override 681 public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, 682 boolean reload) throws IOException { 683 return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row); 684 } 685 686 @Override 687 public boolean isTableEnabled(TableName tableName) throws IOException { 688 return getTableState(tableName).inStates(TableState.State.ENABLED); 689 } 690 691 @Override 692 public boolean isTableDisabled(TableName tableName) throws IOException { 693 return getTableState(tableName).inStates(TableState.State.DISABLED); 694 } 695 696 @Override 697 public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) 698 throws IOException { 699 checkClosed(); 700 try { 701 if (!isTableEnabled(tableName)) { 702 LOG.debug("Table {} not enabled", tableName); 703 return false; 704 } 705 if (TableName.isMetaTableName(tableName)) { 706 // meta table is always available 707 return true; 708 } 709 List<Pair<RegionInfo, ServerName>> locations = 710 MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true); 711 712 int notDeployed = 0; 713 int regionCount = 0; 714 for (Pair<RegionInfo, ServerName> pair : locations) { 715 RegionInfo info = pair.getFirst(); 716 if (pair.getSecond() == null) { 717 LOG.debug("Table {} has not deployed region {}", tableName, 718 pair.getFirst().getEncodedName()); 719 notDeployed++; 720 } else 721 if (splitKeys != null && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 722 for (byte[] splitKey : splitKeys) { 723 // Just check if the splitkey is available 724 if (Bytes.equals(info.getStartKey(), splitKey)) { 725 regionCount++; 726 break; 727 } 728 } 729 } else { 730 // Always empty start row should be counted 731 regionCount++; 732 } 733 } 734 if (notDeployed > 0) { 735 if (LOG.isDebugEnabled()) { 736 LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed); 737 } 738 return false; 739 } else if (splitKeys != null && regionCount != splitKeys.length + 1) { 740 if (LOG.isDebugEnabled()) { 741 LOG.debug("Table {} expected to have {} regions, but only {} available", tableName, 742 splitKeys.length + 1, regionCount); 743 } 744 return false; 745 } else { 746 LOG.trace("Table {} should be available", tableName); 747 return true; 748 } 749 } catch (TableNotFoundException tnfe) { 750 LOG.warn("Table {} does not exist", tableName); 751 return false; 752 } 753 } 754 755 @Override 756 public HRegionLocation locateRegion(final byte[] regionName) throws IOException { 757 RegionLocations locations = locateRegion(RegionInfo.getTable(regionName), 758 RegionInfo.getStartKey(regionName), false, true); 759 return locations == null ? null : locations.getRegionLocation(); 760 } 761 762 private boolean isDeadServer(ServerName sn) { 763 if (clusterStatusListener == null) { 764 return false; 765 } else { 766 return clusterStatusListener.isDeadServer(sn); 767 } 768 } 769 770 @Override 771 public List<HRegionLocation> locateRegions(TableName tableName) throws IOException { 772 return locateRegions(tableName, false, true); 773 } 774 775 @Override 776 public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache, 777 boolean offlined) throws IOException { 778 List<RegionInfo> regions; 779 if (TableName.isMetaTableName(tableName)) { 780 regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO); 781 } else { 782 regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined); 783 } 784 List<HRegionLocation> locations = new ArrayList<>(); 785 for (RegionInfo regionInfo : regions) { 786 if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) { 787 continue; 788 } 789 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true); 790 if (list != null) { 791 for (HRegionLocation loc : list.getRegionLocations()) { 792 if (loc != null) { 793 locations.add(loc); 794 } 795 } 796 } 797 } 798 return locations; 799 } 800 801 @Override 802 public HRegionLocation locateRegion(final TableName tableName, final byte[] row) 803 throws IOException { 804 RegionLocations locations = locateRegion(tableName, row, true, true); 805 return locations == null ? null : locations.getRegionLocation(); 806 } 807 808 @Override 809 public HRegionLocation relocateRegion(final TableName tableName, final byte[] row) 810 throws IOException { 811 RegionLocations locations = 812 relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 813 return locations == null 814 ? null 815 : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); 816 } 817 818 @Override 819 public RegionLocations relocateRegion(final TableName tableName, final byte[] row, int replicaId) 820 throws IOException { 821 // Since this is an explicit request not to use any caching, finding 822 // disabled tables should not be desirable. This will ensure that an exception is thrown when 823 // the first time a disabled table is interacted with. 824 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) { 825 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); 826 } 827 828 return locateRegion(tableName, row, false, true, replicaId); 829 } 830 831 @Override 832 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 833 boolean retry) throws IOException { 834 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); 835 } 836 837 @Override 838 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 839 boolean retry, int replicaId) throws IOException { 840 checkClosed(); 841 if (tableName == null || tableName.getName().length == 0) { 842 throw new IllegalArgumentException("table name cannot be null or zero length"); 843 } 844 if (tableName.equals(TableName.META_TABLE_NAME)) { 845 return locateMeta(tableName, useCache, replicaId); 846 } else { 847 // Region not in the cache - have to go to the meta RS 848 return locateRegionInMeta(tableName, row, useCache, retry, replicaId); 849 } 850 } 851 852 private RegionLocations locateMeta(final TableName tableName, boolean useCache, int replicaId) 853 throws IOException { 854 // HBASE-10785: We cache the location of the META itself, so that we are not overloading 855 // zookeeper with one request for every region lookup. We cache the META with empty row 856 // key in MetaCache. 857 byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta 858 RegionLocations locations = null; 859 if (useCache) { 860 locations = getCachedLocation(tableName, metaCacheKey); 861 if (locations != null && locations.getRegionLocation(replicaId) != null) { 862 return locations; 863 } 864 } 865 866 // only one thread should do the lookup. 867 synchronized (metaRegionLock) { 868 // Check the cache again for a hit in case some other thread made the 869 // same query while we were waiting on the lock. 870 if (useCache) { 871 locations = getCachedLocation(tableName, metaCacheKey); 872 if (locations != null && locations.getRegionLocation(replicaId) != null) { 873 return locations; 874 } 875 } 876 877 // Look up from zookeeper 878 locations = get(this.registry.getMetaRegionLocations()); 879 if (locations != null) { 880 cacheLocation(tableName, locations); 881 } 882 } 883 return locations; 884 } 885 886 /** 887 * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're 888 * seeking. 889 */ 890 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, 891 boolean retry, int replicaId) throws IOException { 892 // If we are supposed to be using the cache, look in the cache to see if we already have the 893 // region. 894 if (useCache) { 895 RegionLocations locations = getCachedLocation(tableName, row); 896 if (locations != null && locations.getRegionLocation(replicaId) != null) { 897 return locations; 898 } 899 } 900 // build the key of the meta region we should be looking for. 901 // the extra 9's on the end are necessary to allow "exact" matches 902 // without knowing the precise region names. 903 byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 904 byte[] metaStopKey = 905 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 906 Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 907 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(1) 908 .setReadType(ReadType.PREAD); 909 910 switch (this.metaReplicaMode) { 911 case LOAD_BALANCE: 912 int metaReplicaId = 913 this.metaReplicaSelector.select(tableName, row, RegionLocateType.CURRENT); 914 if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) { 915 // If the selector gives a non-primary meta replica region, then go with it. 916 // Otherwise, just go to primary in non-hedgedRead mode. 917 s.setConsistency(Consistency.TIMELINE); 918 s.setReplicaId(metaReplicaId); 919 } 920 break; 921 case HEDGED_READ: 922 s.setConsistency(Consistency.TIMELINE); 923 break; 924 default: 925 // do nothing 926 } 927 int maxAttempts = (retry ? numTries : 1); 928 boolean relocateMeta = false; 929 for (int tries = 0;; tries++) { 930 if (tries >= maxAttempts) { 931 throw new NoServerForRegionException("Unable to find region for " 932 + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries."); 933 } 934 if (useCache) { 935 RegionLocations locations = getCachedLocation(tableName, row); 936 if (locations != null && locations.getRegionLocation(replicaId) != null) { 937 return locations; 938 } 939 } else { 940 // If we are not supposed to be using the cache, delete any existing cached location 941 // so it won't interfere. 942 // We are only supposed to clean the cache for the specific replicaId 943 metaCache.clearCache(tableName, row, replicaId); 944 } 945 // Query the meta region 946 long pauseBase = connectionConfig.getPauseMillis(); 947 takeUserRegionLock(); 948 try { 949 // We don't need to check if useCache is enabled or not. Even if useCache is false 950 // we already cleared the cache for this row before acquiring userRegion lock so if this 951 // row is present in cache that means some other thread has populated it while we were 952 // waiting to acquire user region lock. 953 RegionLocations locations = getCachedLocation(tableName, row); 954 if (locations != null && locations.getRegionLocation(replicaId) != null) { 955 return locations; 956 } 957 if (relocateMeta) { 958 relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, 959 RegionInfo.DEFAULT_REPLICA_ID); 960 } 961 s.resetMvccReadPoint(); 962 final Span span = new TableOperationSpanBuilder(this) 963 .setTableName(TableName.META_TABLE_NAME).setOperation(s).build(); 964 try (Scope ignored = span.makeCurrent(); 965 ReversedClientScanner rcs = 966 new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, 967 rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), 968 connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond)) { 969 boolean tableNotFound = true; 970 for (;;) { 971 Result regionInfoRow = rcs.next(); 972 if (regionInfoRow == null) { 973 if (tableNotFound) { 974 throw new TableNotFoundException(tableName); 975 } else { 976 throw new IOException( 977 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 978 } 979 } 980 tableNotFound = false; 981 // convert the row result into the HRegionLocation we need! 982 locations = MetaTableAccessor.getRegionLocations(regionInfoRow); 983 if (locations == null || locations.getRegionLocation(replicaId) == null) { 984 throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow); 985 } 986 RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion(); 987 if (regionInfo == null) { 988 throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME 989 + ", row=" + regionInfoRow); 990 } 991 // See HBASE-20182. It is possible that we locate to a split parent even after the 992 // children are online, so here we need to skip this region and go to the next one. 993 if (regionInfo.isSplitParent()) { 994 continue; 995 } 996 if (regionInfo.isOffline()) { 997 throw new RegionOfflineException( 998 "Region offline; disable table call? " + regionInfo.getRegionNameAsString()); 999 } 1000 // It is possible that the split children have not been online yet and we have skipped 1001 // the parent in the above condition, so we may have already reached a region which does 1002 // not contains us. 1003 if (!regionInfo.containsRow(row)) { 1004 throw new IOException( 1005 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 1006 } 1007 ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); 1008 if (serverName == null) { 1009 throw new NoServerForRegionException("No server address listed in " 1010 + TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() 1011 + " containing row " + Bytes.toStringBinary(row)); 1012 } 1013 if (isDeadServer(serverName)) { 1014 throw new RegionServerStoppedException( 1015 "hbase:meta says the region " + regionInfo.getRegionNameAsString() 1016 + " is managed by the server " + serverName + ", but it is dead."); 1017 } 1018 // Instantiate the location 1019 cacheLocation(tableName, locations); 1020 return locations; 1021 } 1022 } 1023 } catch (TableNotFoundException e) { 1024 // if we got this error, probably means the table just plain doesn't 1025 // exist. rethrow the error immediately. this should always be coming 1026 // from the HTable constructor. 1027 throw e; 1028 } catch (LocalConnectionClosedException cce) { 1029 // LocalConnectionClosedException is specialized instance of DoNotRetryIOE. 1030 // Thrown when we check if this connection is closed. If it is, don't retry. 1031 throw cce; 1032 } catch (IOException e) { 1033 ExceptionUtil.rethrowIfInterrupt(e); 1034 if (e instanceof RemoteException) { 1035 e = ((RemoteException) e).unwrapRemoteException(); 1036 } 1037 if (HBaseServerException.isServerOverloaded(e)) { 1038 // Give a special pause when encountering an exception indicating the server 1039 // is overloaded. see #HBASE-17114 and HBASE-26807 1040 pauseBase = connectionConfig.getPauseMillisForServerOverloaded(); 1041 } 1042 if (tries < maxAttempts - 1) { 1043 LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " 1044 + "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e); 1045 } else { 1046 throw e; 1047 } 1048 // Only relocate the parent region if necessary 1049 relocateMeta = 1050 !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); 1051 } finally { 1052 userRegionLock.unlock(); 1053 } 1054 try { 1055 Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); 1056 } catch (InterruptedException e) { 1057 throw new InterruptedIOException( 1058 "Giving up trying to location region in " + "meta: thread is interrupted."); 1059 } 1060 } 1061 } 1062 1063 void takeUserRegionLock() throws IOException { 1064 try { 1065 long waitTime = connectionConfig.getMetaOperationTimeout(); 1066 if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { 1067 throw new LockTimeoutException("Failed to get user region lock in" + waitTime + " ms. " 1068 + " for accessing meta region server."); 1069 } 1070 } catch (InterruptedException ie) { 1071 LOG.error("Interrupted while waiting for a lock", ie); 1072 throw ExceptionUtil.asInterrupt(ie); 1073 } 1074 } 1075 1076 /** 1077 * Put a newly discovered HRegionLocation into the cache. 1078 * @param tableName The table name. 1079 * @param location the new location 1080 */ 1081 @Override 1082 public void cacheLocation(final TableName tableName, final RegionLocations location) { 1083 metaCache.cacheLocation(tableName, location); 1084 } 1085 1086 /** 1087 * Search the cache for a location that fits our table and row key. Return null if no suitable 1088 * region is located. 1089 * @return Null or region location found in cache. 1090 */ 1091 RegionLocations getCachedLocation(final TableName tableName, final byte[] row) { 1092 return metaCache.getCachedLocation(tableName, row); 1093 } 1094 1095 public void clearRegionCache(final TableName tableName, byte[] row) { 1096 metaCache.clearCache(tableName, row); 1097 } 1098 1099 /* 1100 * Delete all cached entries of a table that maps to a specific location. 1101 */ 1102 @Override 1103 public void clearCaches(final ServerName serverName) { 1104 metaCache.clearCache(serverName); 1105 } 1106 1107 @Override 1108 public void clearRegionLocationCache() { 1109 metaCache.clearCache(); 1110 } 1111 1112 @Override 1113 public void clearRegionCache(final TableName tableName) { 1114 metaCache.clearCache(tableName); 1115 } 1116 1117 /** 1118 * Put a newly discovered HRegionLocation into the cache. 1119 * @param tableName The table name. 1120 * @param source the source of the new location, if it's not coming from meta 1121 * @param location the new location 1122 */ 1123 private void cacheLocation(final TableName tableName, final ServerName source, 1124 final HRegionLocation location) { 1125 metaCache.cacheLocation(tableName, source, location); 1126 } 1127 1128 // Map keyed by service name + regionserver to service stub implementation 1129 private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>(); 1130 1131 /** 1132 * State of the MasterService connection/setup. 1133 */ 1134 static class MasterServiceState { 1135 Connection connection; 1136 1137 MasterProtos.MasterService.BlockingInterface stub; 1138 int userCount; 1139 1140 MasterServiceState(final Connection connection) { 1141 super(); 1142 this.connection = connection; 1143 } 1144 1145 @Override 1146 public String toString() { 1147 return "MasterService"; 1148 } 1149 1150 Object getStub() { 1151 return this.stub; 1152 } 1153 1154 void clearStub() { 1155 this.stub = null; 1156 } 1157 1158 boolean isMasterRunning() throws IOException { 1159 MasterProtos.IsMasterRunningResponse response = null; 1160 try { 1161 response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1162 } catch (Exception e) { 1163 throw ProtobufUtil.handleRemoteException(e); 1164 } 1165 return response != null ? response.getIsMasterRunning() : false; 1166 } 1167 } 1168 1169 /** 1170 * The record of errors for servers. 1171 */ 1172 static class ServerErrorTracker { 1173 // We need a concurrent map here, as we could have multiple threads updating it in parallel. 1174 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = 1175 new ConcurrentHashMap<>(); 1176 private final long canRetryUntil; 1177 private final int maxTries;// max number to try 1178 private final long startTrackingTime; 1179 1180 /** 1181 * Constructor 1182 * @param timeout how long to wait before timeout, in unit of millisecond 1183 * @param maxTries how many times to try 1184 */ 1185 @SuppressWarnings("JavaUtilDate") 1186 public ServerErrorTracker(long timeout, int maxTries) { 1187 this.maxTries = maxTries; 1188 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; 1189 this.startTrackingTime = new Date().getTime(); 1190 } 1191 1192 /** 1193 * We stop to retry when we have exhausted BOTH the number of tries and the time allocated. 1194 * @param numAttempt how many times we have tried by now 1195 */ 1196 boolean canTryMore(int numAttempt) { 1197 // If there is a single try we must not take into account the time. 1198 return numAttempt < maxTries 1199 || (maxTries > 1 && EnvironmentEdgeManager.currentTime() < this.canRetryUntil); 1200 } 1201 1202 /** 1203 * Calculates the back-off time for a retrying request to a particular server. 1204 * @param server The server in question. 1205 * @param basePause The default hci pause. 1206 * @return The time to wait before sending next request. 1207 */ 1208 long calculateBackoffTime(ServerName server, long basePause) { 1209 long result; 1210 ServerErrors errorStats = errorsByServer.get(server); 1211 if (errorStats != null) { 1212 result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1)); 1213 } else { 1214 result = 0; // yes, if the server is not in our list we don't wait before retrying. 1215 } 1216 return result; 1217 } 1218 1219 /** 1220 * Reports that there was an error on the server to do whatever bean-counting necessary. 1221 * @param server The server in question. 1222 */ 1223 void reportServerError(ServerName server) { 1224 computeIfAbsent(errorsByServer, server, ServerErrors::new).addError(); 1225 } 1226 1227 long getStartTrackingTime() { 1228 return startTrackingTime; 1229 } 1230 1231 /** 1232 * The record of errors for a server. 1233 */ 1234 private static class ServerErrors { 1235 private final AtomicInteger retries = new AtomicInteger(0); 1236 1237 public int getCount() { 1238 return retries.get(); 1239 } 1240 1241 public void addError() { 1242 retries.incrementAndGet(); 1243 } 1244 } 1245 } 1246 1247 /** 1248 * Class to make a MasterServiceStubMaker stub. 1249 */ 1250 private final class MasterServiceStubMaker { 1251 1252 private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) 1253 throws IOException { 1254 try { 1255 stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1256 } catch (ServiceException e) { 1257 throw ProtobufUtil.handleRemoteException(e); 1258 } 1259 } 1260 1261 /** 1262 * Create a stub. Try once only. It is not typed because there is no common type to protobuf 1263 * services nor their interfaces. Let the caller do appropriate casting. 1264 * @return A stub for master services. 1265 */ 1266 private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() 1267 throws IOException, KeeperException { 1268 ServerName sn = get(registry.getActiveMaster()); 1269 if (sn == null) { 1270 String msg = "ZooKeeper available but no active master location found"; 1271 LOG.info(msg); 1272 throw new MasterNotRunningException(msg); 1273 } 1274 if (isDeadServer(sn)) { 1275 throw new MasterNotRunningException(sn + " is dead."); 1276 } 1277 // Use the security info interface name as our stub key 1278 String key = getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn); 1279 MasterProtos.MasterService.BlockingInterface stub = 1280 (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1281 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); 1282 return MasterProtos.MasterService.newBlockingStub(channel); 1283 }); 1284 isMasterRunning(stub); 1285 return stub; 1286 } 1287 1288 /** 1289 * Create a stub against the master. Retry if necessary. 1290 * @return A stub to do <code>intf</code> against the master 1291 * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running 1292 */ 1293 MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { 1294 // The lock must be at the beginning to prevent multiple master creations 1295 // (and leaks) in a multithread context 1296 synchronized (masterLock) { 1297 Exception exceptionCaught = null; 1298 if (!closed) { 1299 try { 1300 return makeStubNoRetries(); 1301 } catch (IOException e) { 1302 exceptionCaught = e; 1303 } catch (KeeperException e) { 1304 exceptionCaught = e; 1305 } 1306 throw new MasterNotRunningException(exceptionCaught); 1307 } else { 1308 throw new DoNotRetryIOException("Connection was closed while trying to get master"); 1309 } 1310 } 1311 } 1312 } 1313 1314 @Override 1315 public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException { 1316 return getAdmin(get(registry.getActiveMaster())); 1317 } 1318 1319 @Override 1320 public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName) 1321 throws IOException { 1322 checkClosed(); 1323 if (isDeadServer(serverName)) { 1324 throw new RegionServerStoppedException(serverName + " is dead."); 1325 } 1326 String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName); 1327 return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1328 BlockingRpcChannel channel = 1329 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1330 return AdminProtos.AdminService.newBlockingStub(channel); 1331 }); 1332 } 1333 1334 @Override 1335 public BlockingInterface getClient(ServerName serverName) throws IOException { 1336 checkClosed(); 1337 if (isDeadServer(serverName)) { 1338 throw new RegionServerStoppedException(serverName + " is dead."); 1339 } 1340 String key = 1341 getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), serverName); 1342 return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1343 BlockingRpcChannel channel = 1344 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1345 return ClientProtos.ClientService.newBlockingStub(channel); 1346 }); 1347 } 1348 1349 final MasterServiceState masterServiceState = new MasterServiceState(this); 1350 1351 @Override 1352 public MasterKeepAliveConnection getMaster() throws IOException { 1353 return getKeepAliveMasterService(); 1354 } 1355 1356 private void resetMasterServiceState(final MasterServiceState mss) { 1357 mss.userCount++; 1358 } 1359 1360 private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException { 1361 synchronized (masterLock) { 1362 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) { 1363 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); 1364 this.masterServiceState.stub = stubMaker.makeStub(); 1365 } 1366 resetMasterServiceState(this.masterServiceState); 1367 } 1368 // Ugly delegation just so we can add in a Close method. 1369 final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub; 1370 return new MasterKeepAliveConnection() { 1371 MasterServiceState mss = masterServiceState; 1372 1373 @Override 1374 public MasterProtos.AbortProcedureResponse abortProcedure(RpcController controller, 1375 MasterProtos.AbortProcedureRequest request) throws ServiceException { 1376 return stub.abortProcedure(controller, request); 1377 } 1378 1379 @Override 1380 public MasterProtos.GetProceduresResponse getProcedures(RpcController controller, 1381 MasterProtos.GetProceduresRequest request) throws ServiceException { 1382 return stub.getProcedures(controller, request); 1383 } 1384 1385 @Override 1386 public MasterProtos.GetLocksResponse getLocks(RpcController controller, 1387 MasterProtos.GetLocksRequest request) throws ServiceException { 1388 return stub.getLocks(controller, request); 1389 } 1390 1391 @Override 1392 public MasterProtos.AddColumnResponse addColumn(RpcController controller, 1393 MasterProtos.AddColumnRequest request) throws ServiceException { 1394 return stub.addColumn(controller, request); 1395 } 1396 1397 @Override 1398 public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller, 1399 MasterProtos.DeleteColumnRequest request) throws ServiceException { 1400 return stub.deleteColumn(controller, request); 1401 } 1402 1403 @Override 1404 public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller, 1405 MasterProtos.ModifyColumnRequest request) throws ServiceException { 1406 return stub.modifyColumn(controller, request); 1407 } 1408 1409 @Override 1410 public MasterProtos.MoveRegionResponse moveRegion(RpcController controller, 1411 MasterProtos.MoveRegionRequest request) throws ServiceException { 1412 return stub.moveRegion(controller, request); 1413 } 1414 1415 @Override 1416 public MasterProtos.MergeTableRegionsResponse mergeTableRegions(RpcController controller, 1417 MasterProtos.MergeTableRegionsRequest request) throws ServiceException { 1418 return stub.mergeTableRegions(controller, request); 1419 } 1420 1421 @Override 1422 public MasterProtos.AssignRegionResponse assignRegion(RpcController controller, 1423 MasterProtos.AssignRegionRequest request) throws ServiceException { 1424 return stub.assignRegion(controller, request); 1425 } 1426 1427 @Override 1428 public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller, 1429 MasterProtos.UnassignRegionRequest request) throws ServiceException { 1430 return stub.unassignRegion(controller, request); 1431 } 1432 1433 @Override 1434 public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller, 1435 MasterProtos.OfflineRegionRequest request) throws ServiceException { 1436 return stub.offlineRegion(controller, request); 1437 } 1438 1439 @Override 1440 public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller, 1441 MasterProtos.SplitTableRegionRequest request) throws ServiceException { 1442 return stub.splitRegion(controller, request); 1443 } 1444 1445 @Override 1446 public MasterProtos.DeleteTableResponse deleteTable(RpcController controller, 1447 MasterProtos.DeleteTableRequest request) throws ServiceException { 1448 return stub.deleteTable(controller, request); 1449 } 1450 1451 @Override 1452 public MasterProtos.TruncateTableResponse truncateTable(RpcController controller, 1453 MasterProtos.TruncateTableRequest request) throws ServiceException { 1454 return stub.truncateTable(controller, request); 1455 } 1456 1457 @Override 1458 public MasterProtos.EnableTableResponse enableTable(RpcController controller, 1459 MasterProtos.EnableTableRequest request) throws ServiceException { 1460 return stub.enableTable(controller, request); 1461 } 1462 1463 @Override 1464 public MasterProtos.DisableTableResponse disableTable(RpcController controller, 1465 MasterProtos.DisableTableRequest request) throws ServiceException { 1466 return stub.disableTable(controller, request); 1467 } 1468 1469 @Override 1470 public MasterProtos.ModifyTableResponse modifyTable(RpcController controller, 1471 MasterProtos.ModifyTableRequest request) throws ServiceException { 1472 return stub.modifyTable(controller, request); 1473 } 1474 1475 @Override 1476 public MasterProtos.CreateTableResponse createTable(RpcController controller, 1477 MasterProtos.CreateTableRequest request) throws ServiceException { 1478 return stub.createTable(controller, request); 1479 } 1480 1481 @Override 1482 public MasterProtos.ShutdownResponse shutdown(RpcController controller, 1483 MasterProtos.ShutdownRequest request) throws ServiceException { 1484 return stub.shutdown(controller, request); 1485 } 1486 1487 @Override 1488 public MasterProtos.StopMasterResponse stopMaster(RpcController controller, 1489 MasterProtos.StopMasterRequest request) throws ServiceException { 1490 return stub.stopMaster(controller, request); 1491 } 1492 1493 @Override 1494 public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode( 1495 final RpcController controller, final MasterProtos.IsInMaintenanceModeRequest request) 1496 throws ServiceException { 1497 return stub.isMasterInMaintenanceMode(controller, request); 1498 } 1499 1500 @Override 1501 public MasterProtos.BalanceResponse balance(RpcController controller, 1502 MasterProtos.BalanceRequest request) throws ServiceException { 1503 return stub.balance(controller, request); 1504 } 1505 1506 @Override 1507 public MasterProtos.SetBalancerRunningResponse setBalancerRunning(RpcController controller, 1508 MasterProtos.SetBalancerRunningRequest request) throws ServiceException { 1509 return stub.setBalancerRunning(controller, request); 1510 } 1511 1512 @Override 1513 public NormalizeResponse normalize(RpcController controller, NormalizeRequest request) 1514 throws ServiceException { 1515 return stub.normalize(controller, request); 1516 } 1517 1518 @Override 1519 public SetNormalizerRunningResponse setNormalizerRunning(RpcController controller, 1520 SetNormalizerRunningRequest request) throws ServiceException { 1521 return stub.setNormalizerRunning(controller, request); 1522 } 1523 1524 @Override 1525 public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller, 1526 MasterProtos.RunCatalogScanRequest request) throws ServiceException { 1527 return stub.runCatalogScan(controller, request); 1528 } 1529 1530 @Override 1531 public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor( 1532 RpcController controller, MasterProtos.EnableCatalogJanitorRequest request) 1533 throws ServiceException { 1534 return stub.enableCatalogJanitor(controller, request); 1535 } 1536 1537 @Override 1538 public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled( 1539 RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request) 1540 throws ServiceException { 1541 return stub.isCatalogJanitorEnabled(controller, request); 1542 } 1543 1544 @Override 1545 public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller, 1546 MasterProtos.RunCleanerChoreRequest request) throws ServiceException { 1547 return stub.runCleanerChore(controller, request); 1548 } 1549 1550 @Override 1551 public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning( 1552 RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request) 1553 throws ServiceException { 1554 return stub.setCleanerChoreRunning(controller, request); 1555 } 1556 1557 @Override 1558 public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled( 1559 RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request) 1560 throws ServiceException { 1561 return stub.isCleanerChoreEnabled(controller, request); 1562 } 1563 1564 @Override 1565 public ClientProtos.CoprocessorServiceResponse execMasterService(RpcController controller, 1566 ClientProtos.CoprocessorServiceRequest request) throws ServiceException { 1567 return stub.execMasterService(controller, request); 1568 } 1569 1570 @Override 1571 public MasterProtos.SnapshotResponse snapshot(RpcController controller, 1572 MasterProtos.SnapshotRequest request) throws ServiceException { 1573 return stub.snapshot(controller, request); 1574 } 1575 1576 @Override 1577 public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots( 1578 RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request) 1579 throws ServiceException { 1580 return stub.getCompletedSnapshots(controller, request); 1581 } 1582 1583 @Override 1584 public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller, 1585 MasterProtos.DeleteSnapshotRequest request) throws ServiceException { 1586 return stub.deleteSnapshot(controller, request); 1587 } 1588 1589 @Override 1590 public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller, 1591 MasterProtos.IsSnapshotDoneRequest request) throws ServiceException { 1592 return stub.isSnapshotDone(controller, request); 1593 } 1594 1595 @Override 1596 public MasterProtos.RestoreSnapshotResponse restoreSnapshot(RpcController controller, 1597 MasterProtos.RestoreSnapshotRequest request) throws ServiceException { 1598 return stub.restoreSnapshot(controller, request); 1599 } 1600 1601 @Override 1602 public MasterProtos.SetSnapshotCleanupResponse switchSnapshotCleanup(RpcController controller, 1603 MasterProtos.SetSnapshotCleanupRequest request) throws ServiceException { 1604 return stub.switchSnapshotCleanup(controller, request); 1605 } 1606 1607 @Override 1608 public MasterProtos.IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled( 1609 RpcController controller, MasterProtos.IsSnapshotCleanupEnabledRequest request) 1610 throws ServiceException { 1611 return stub.isSnapshotCleanupEnabled(controller, request); 1612 } 1613 1614 @Override 1615 public MasterProtos.ExecProcedureResponse execProcedure(RpcController controller, 1616 MasterProtos.ExecProcedureRequest request) throws ServiceException { 1617 return stub.execProcedure(controller, request); 1618 } 1619 1620 @Override 1621 public MasterProtos.ExecProcedureResponse execProcedureWithRet(RpcController controller, 1622 MasterProtos.ExecProcedureRequest request) throws ServiceException { 1623 return stub.execProcedureWithRet(controller, request); 1624 } 1625 1626 @Override 1627 public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller, 1628 MasterProtos.IsProcedureDoneRequest request) throws ServiceException { 1629 return stub.isProcedureDone(controller, request); 1630 } 1631 1632 @Override 1633 public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller, 1634 MasterProtos.GetProcedureResultRequest request) throws ServiceException { 1635 return stub.getProcedureResult(controller, request); 1636 } 1637 1638 @Override 1639 public MasterProtos.IsMasterRunningResponse isMasterRunning(RpcController controller, 1640 MasterProtos.IsMasterRunningRequest request) throws ServiceException { 1641 return stub.isMasterRunning(controller, request); 1642 } 1643 1644 @Override 1645 public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller, 1646 MasterProtos.ModifyNamespaceRequest request) throws ServiceException { 1647 return stub.modifyNamespace(controller, request); 1648 } 1649 1650 @Override 1651 public MasterProtos.CreateNamespaceResponse createNamespace(RpcController controller, 1652 MasterProtos.CreateNamespaceRequest request) throws ServiceException { 1653 return stub.createNamespace(controller, request); 1654 } 1655 1656 @Override 1657 public MasterProtos.DeleteNamespaceResponse deleteNamespace(RpcController controller, 1658 MasterProtos.DeleteNamespaceRequest request) throws ServiceException { 1659 return stub.deleteNamespace(controller, request); 1660 } 1661 1662 @Override 1663 public MasterProtos.ListNamespacesResponse listNamespaces(RpcController controller, 1664 MasterProtos.ListNamespacesRequest request) throws ServiceException { 1665 return stub.listNamespaces(controller, request); 1666 } 1667 1668 @Override 1669 public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor( 1670 RpcController controller, MasterProtos.GetNamespaceDescriptorRequest request) 1671 throws ServiceException { 1672 return stub.getNamespaceDescriptor(controller, request); 1673 } 1674 1675 @Override 1676 public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors( 1677 RpcController controller, MasterProtos.ListNamespaceDescriptorsRequest request) 1678 throws ServiceException { 1679 return stub.listNamespaceDescriptors(controller, request); 1680 } 1681 1682 @Override 1683 public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace( 1684 RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request) 1685 throws ServiceException { 1686 return stub.listTableDescriptorsByNamespace(controller, request); 1687 } 1688 1689 @Override 1690 public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace( 1691 RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request) 1692 throws ServiceException { 1693 return stub.listTableNamesByNamespace(controller, request); 1694 } 1695 1696 @Override 1697 public MasterProtos.GetTableStateResponse getTableState(RpcController controller, 1698 MasterProtos.GetTableStateRequest request) throws ServiceException { 1699 return stub.getTableState(controller, request); 1700 } 1701 1702 @Override 1703 public void close() { 1704 release(this.mss); 1705 } 1706 1707 @Override 1708 public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus( 1709 RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request) 1710 throws ServiceException { 1711 return stub.getSchemaAlterStatus(controller, request); 1712 } 1713 1714 @Override 1715 public MasterProtos.GetTableDescriptorsResponse getTableDescriptors(RpcController controller, 1716 MasterProtos.GetTableDescriptorsRequest request) throws ServiceException { 1717 return stub.getTableDescriptors(controller, request); 1718 } 1719 1720 @Override 1721 public MasterProtos.GetTableNamesResponse getTableNames(RpcController controller, 1722 MasterProtos.GetTableNamesRequest request) throws ServiceException { 1723 return stub.getTableNames(controller, request); 1724 } 1725 1726 @Override 1727 public MasterProtos.GetClusterStatusResponse getClusterStatus(RpcController controller, 1728 MasterProtos.GetClusterStatusRequest request) throws ServiceException { 1729 return stub.getClusterStatus(controller, request); 1730 } 1731 1732 @Override 1733 public MasterProtos.SetQuotaResponse setQuota(RpcController controller, 1734 MasterProtos.SetQuotaRequest request) throws ServiceException { 1735 return stub.setQuota(controller, request); 1736 } 1737 1738 @Override 1739 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp( 1740 RpcController controller, MasterProtos.MajorCompactionTimestampRequest request) 1741 throws ServiceException { 1742 return stub.getLastMajorCompactionTimestamp(controller, request); 1743 } 1744 1745 @Override 1746 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion( 1747 RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request) 1748 throws ServiceException { 1749 return stub.getLastMajorCompactionTimestampForRegion(controller, request); 1750 } 1751 1752 @Override 1753 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, 1754 IsBalancerEnabledRequest request) throws ServiceException { 1755 return stub.isBalancerEnabled(controller, request); 1756 } 1757 1758 @Override 1759 public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled( 1760 RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request) 1761 throws ServiceException { 1762 return stub.setSplitOrMergeEnabled(controller, request); 1763 } 1764 1765 @Override 1766 public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled( 1767 RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request) 1768 throws ServiceException { 1769 return stub.isSplitOrMergeEnabled(controller, request); 1770 } 1771 1772 @Override 1773 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller, 1774 IsNormalizerEnabledRequest request) throws ServiceException { 1775 return stub.isNormalizerEnabled(controller, request); 1776 } 1777 1778 @Override 1779 public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller, 1780 SecurityCapabilitiesRequest request) throws ServiceException { 1781 return stub.getSecurityCapabilities(controller, request); 1782 } 1783 1784 @Override 1785 public AddReplicationPeerResponse addReplicationPeer(RpcController controller, 1786 AddReplicationPeerRequest request) throws ServiceException { 1787 return stub.addReplicationPeer(controller, request); 1788 } 1789 1790 @Override 1791 public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller, 1792 RemoveReplicationPeerRequest request) throws ServiceException { 1793 return stub.removeReplicationPeer(controller, request); 1794 } 1795 1796 @Override 1797 public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller, 1798 EnableReplicationPeerRequest request) throws ServiceException { 1799 return stub.enableReplicationPeer(controller, request); 1800 } 1801 1802 @Override 1803 public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller, 1804 DisableReplicationPeerRequest request) throws ServiceException { 1805 return stub.disableReplicationPeer(controller, request); 1806 } 1807 1808 @Override 1809 public ListDecommissionedRegionServersResponse listDecommissionedRegionServers( 1810 RpcController controller, ListDecommissionedRegionServersRequest request) 1811 throws ServiceException { 1812 return stub.listDecommissionedRegionServers(controller, request); 1813 } 1814 1815 @Override 1816 public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller, 1817 DecommissionRegionServersRequest request) throws ServiceException { 1818 return stub.decommissionRegionServers(controller, request); 1819 } 1820 1821 @Override 1822 public RecommissionRegionServerResponse recommissionRegionServer(RpcController controller, 1823 RecommissionRegionServerRequest request) throws ServiceException { 1824 return stub.recommissionRegionServer(controller, request); 1825 } 1826 1827 @Override 1828 public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller, 1829 GetReplicationPeerConfigRequest request) throws ServiceException { 1830 return stub.getReplicationPeerConfig(controller, request); 1831 } 1832 1833 @Override 1834 public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig( 1835 RpcController controller, UpdateReplicationPeerConfigRequest request) 1836 throws ServiceException { 1837 return stub.updateReplicationPeerConfig(controller, request); 1838 } 1839 1840 @Override 1841 public ListReplicationPeersResponse listReplicationPeers(RpcController controller, 1842 ListReplicationPeersRequest request) throws ServiceException { 1843 return stub.listReplicationPeers(controller, request); 1844 } 1845 1846 @Override 1847 public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes(RpcController controller, 1848 GetSpaceQuotaRegionSizesRequest request) throws ServiceException { 1849 return stub.getSpaceQuotaRegionSizes(controller, request); 1850 } 1851 1852 @Override 1853 public GetQuotaStatesResponse getQuotaStates(RpcController controller, 1854 GetQuotaStatesRequest request) throws ServiceException { 1855 return stub.getQuotaStates(controller, request); 1856 } 1857 1858 @Override 1859 public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller, 1860 MasterProtos.ClearDeadServersRequest request) throws ServiceException { 1861 return stub.clearDeadServers(controller, request); 1862 } 1863 1864 @Override 1865 public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, 1866 SwitchRpcThrottleRequest request) throws ServiceException { 1867 return stub.switchRpcThrottle(controller, request); 1868 } 1869 1870 @Override 1871 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller, 1872 IsRpcThrottleEnabledRequest request) throws ServiceException { 1873 return stub.isRpcThrottleEnabled(controller, request); 1874 } 1875 1876 @Override 1877 public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller, 1878 SwitchExceedThrottleQuotaRequest request) throws ServiceException { 1879 return stub.switchExceedThrottleQuota(controller, request); 1880 } 1881 1882 @Override 1883 public AccessControlProtos.GrantResponse grant(RpcController controller, 1884 AccessControlProtos.GrantRequest request) throws ServiceException { 1885 return stub.grant(controller, request); 1886 } 1887 1888 @Override 1889 public AccessControlProtos.RevokeResponse revoke(RpcController controller, 1890 AccessControlProtos.RevokeRequest request) throws ServiceException { 1891 return stub.revoke(controller, request); 1892 } 1893 1894 @Override 1895 public GetUserPermissionsResponse getUserPermissions(RpcController controller, 1896 GetUserPermissionsRequest request) throws ServiceException { 1897 return stub.getUserPermissions(controller, request); 1898 } 1899 1900 @Override 1901 public HasUserPermissionsResponse hasUserPermissions(RpcController controller, 1902 HasUserPermissionsRequest request) throws ServiceException { 1903 return stub.hasUserPermissions(controller, request); 1904 } 1905 1906 @Override 1907 public HBaseProtos.LogEntry getLogEntries(RpcController controller, 1908 HBaseProtos.LogRequest request) throws ServiceException { 1909 return stub.getLogEntries(controller, request); 1910 } 1911 1912 @Override 1913 public ModifyTableStoreFileTrackerResponse modifyTableStoreFileTracker( 1914 RpcController controller, ModifyTableStoreFileTrackerRequest request) 1915 throws ServiceException { 1916 return stub.modifyTableStoreFileTracker(controller, request); 1917 } 1918 1919 @Override 1920 public ModifyColumnStoreFileTrackerResponse modifyColumnStoreFileTracker( 1921 RpcController controller, ModifyColumnStoreFileTrackerRequest request) 1922 throws ServiceException { 1923 return stub.modifyColumnStoreFileTracker(controller, request); 1924 } 1925 1926 @Override 1927 public FlushMasterStoreResponse flushMasterStore(RpcController controller, 1928 FlushMasterStoreRequest request) throws ServiceException { 1929 return stub.flushMasterStore(controller, request); 1930 } 1931 }; 1932 } 1933 1934 private static void release(MasterServiceState mss) { 1935 if (mss != null && mss.connection != null) { 1936 ((ConnectionImplementation) mss.connection).releaseMaster(mss); 1937 } 1938 } 1939 1940 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) { 1941 if (mss.getStub() == null) { 1942 return false; 1943 } 1944 try { 1945 return mss.isMasterRunning(); 1946 } catch (UndeclaredThrowableException e) { 1947 // It's somehow messy, but we can receive exceptions such as 1948 // java.net.ConnectException but they're not declared. So we catch it... 1949 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); 1950 return false; 1951 } catch (IOException se) { 1952 LOG.warn("Checking master connection", se); 1953 return false; 1954 } 1955 } 1956 1957 void releaseMaster(MasterServiceState mss) { 1958 if (mss.getStub() == null) { 1959 return; 1960 } 1961 synchronized (masterLock) { 1962 --mss.userCount; 1963 } 1964 } 1965 1966 private void closeMasterService(MasterServiceState mss) { 1967 if (mss.getStub() != null) { 1968 LOG.info("Closing master protocol: " + mss); 1969 mss.clearStub(); 1970 } 1971 mss.userCount = 0; 1972 } 1973 1974 /** 1975 * Immediate close of the shared master. Can be by the delayed close or when closing the 1976 * connection itself. 1977 */ 1978 private void closeMaster() { 1979 synchronized (masterLock) { 1980 closeMasterService(masterServiceState); 1981 } 1982 } 1983 1984 void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) { 1985 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum); 1986 cacheLocation(hri.getTable(), source, newHrl); 1987 } 1988 1989 @Override 1990 public void deleteCachedRegionLocation(final HRegionLocation location) { 1991 metaCache.clearCache(location); 1992 } 1993 1994 /** 1995 * Update the location with the new value (if the exception is a RegionMovedException) or delete 1996 * it from the cache. Does nothing if we can be sure from the exception that the location is still 1997 * accurate, or if the cache has already been updated. 1998 * @param exception an object (to simplify user code) on which we will try to find a nested or 1999 * wrapped or both RegionMovedException 2000 * @param source server that is the source of the location update. 2001 */ 2002 @Override 2003 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey, 2004 final Object exception, final ServerName source) { 2005 if (rowkey == null || tableName == null) { 2006 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) 2007 + ", tableName=" + (tableName == null ? "null" : tableName)); 2008 return; 2009 } 2010 2011 if (source == null) { 2012 // This should not happen, but let's secure ourselves. 2013 return; 2014 } 2015 2016 if (regionName == null) { 2017 // we do not know which region, so just remove the cache entry for the row and server 2018 if (metrics != null) { 2019 metrics.incrCacheDroppingExceptions(exception); 2020 } 2021 metaCache.clearCache(tableName, rowkey, source); 2022 return; 2023 } 2024 2025 // Is it something we have already updated? 2026 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey); 2027 HRegionLocation oldLocation = null; 2028 if (oldLocations != null) { 2029 oldLocation = oldLocations.getRegionLocationByRegionName(regionName); 2030 } 2031 if (oldLocation == null || !source.equals(oldLocation.getServerName())) { 2032 // There is no such location in the cache (it's been removed already) or 2033 // the cache has already been refreshed with a different location. => nothing to do 2034 return; 2035 } 2036 2037 RegionInfo regionInfo = oldLocation.getRegion(); 2038 Throwable cause = ClientExceptionsUtil.findException(exception); 2039 if (cause != null) { 2040 if (!ClientExceptionsUtil.isMetaClearingException(cause)) { 2041 // We know that the region is still on this region server 2042 return; 2043 } 2044 2045 if (cause instanceof RegionMovedException) { 2046 RegionMovedException rme = (RegionMovedException) cause; 2047 if (LOG.isTraceEnabled()) { 2048 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " 2049 + rme.getHostname() + ":" + rme.getPort() + " according to " + source.getAddress()); 2050 } 2051 // We know that the region is not anymore on this region server, but we know 2052 // the new location. 2053 updateCachedLocation(regionInfo, source, rme.getServerName(), rme.getLocationSeqNum()); 2054 return; 2055 } 2056 } 2057 2058 if (metrics != null) { 2059 metrics.incrCacheDroppingExceptions(exception); 2060 } 2061 2062 // Tell metaReplicaSelector that the location is stale. It will create a stale entry 2063 // with timestamp internally. Next time the client looks up the same location, 2064 // it will pick a different meta replica region. 2065 if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 2066 metaReplicaSelector.onError(oldLocation); 2067 } 2068 2069 // If we're here, it means that can cannot be sure about the location, so we remove it from 2070 // the cache. Do not send the source because source can be a new server in the same host:port 2071 metaCache.clearCache(regionInfo); 2072 } 2073 2074 @Override 2075 public AsyncProcess getAsyncProcess() { 2076 return asyncProcess; 2077 } 2078 2079 @Override 2080 public ServerStatisticTracker getStatisticsTracker() { 2081 return this.stats; 2082 } 2083 2084 @Override 2085 public ClientBackoffPolicy getBackoffPolicy() { 2086 return this.backoffPolicy; 2087 } 2088 2089 /* 2090 * Return the number of cached region for a table. It will only be called from a unit test. 2091 */ 2092 int getNumberOfCachedRegionLocations(final TableName tableName) { 2093 return metaCache.getNumberOfCachedRegionLocations(tableName); 2094 } 2095 2096 @Override 2097 public void abort(final String msg, Throwable t) { 2098 if (t != null) { 2099 LOG.error(HBaseMarkers.FATAL, msg, t); 2100 } else { 2101 LOG.error(HBaseMarkers.FATAL, msg); 2102 } 2103 this.aborted = true; 2104 close(); 2105 this.closed = true; 2106 } 2107 2108 @Override 2109 public boolean isClosed() { 2110 return this.closed; 2111 } 2112 2113 @Override 2114 public boolean isAborted() { 2115 return this.aborted; 2116 } 2117 2118 @Override 2119 public void close() { 2120 TraceUtil.trace(() -> { 2121 if (this.closed) { 2122 return; 2123 } 2124 closeMaster(); 2125 shutdownPools(); 2126 if (this.metrics != null) { 2127 this.metrics.shutdown(); 2128 } 2129 this.closed = true; 2130 if (this.registry != null) { 2131 registry.close(); 2132 } 2133 this.stubs.clear(); 2134 if (clusterStatusListener != null) { 2135 clusterStatusListener.close(); 2136 } 2137 if (rpcClient != null) { 2138 rpcClient.close(); 2139 } 2140 synchronized (this) { 2141 if (choreService != null) { 2142 choreService.shutdown(); 2143 } 2144 } 2145 }, this.getClass().getSimpleName() + ".close"); 2146 } 2147 2148 /** 2149 * Close the connection for good. On the off chance that someone is unable to close the 2150 * connection, perhaps because it bailed out prematurely, the method below will ensure that this 2151 * instance is cleaned up. Caveat: The JVM may take an unknown amount of time to call finalize on 2152 * an unreachable object, so our hope is that every consumer cleans up after itself, like any good 2153 * citizen. 2154 */ 2155 @Override 2156 protected void finalize() throws Throwable { 2157 super.finalize(); 2158 close(); 2159 } 2160 2161 @Override 2162 public NonceGenerator getNonceGenerator() { 2163 return nonceGenerator; 2164 } 2165 2166 @Override 2167 public TableState getTableState(TableName tableName) throws IOException { 2168 checkClosed(); 2169 TableState tableState = MetaTableAccessor.getTableState(this, tableName); 2170 if (tableState == null) { 2171 throw new TableNotFoundException(tableName); 2172 } 2173 return tableState; 2174 } 2175 2176 @Override 2177 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { 2178 return RpcRetryingCallerFactory.instantiate(conf, this.interceptor, 2179 this.getStatisticsTracker()); 2180 } 2181 2182 @Override 2183 public boolean hasCellBlockSupport() { 2184 return this.rpcClient.hasCellBlockSupport(); 2185 } 2186 2187 @Override 2188 public ConnectionConfiguration getConnectionConfiguration() { 2189 return this.connectionConfig; 2190 } 2191 2192 @Override 2193 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { 2194 return this.rpcCallerFactory; 2195 } 2196 2197 @Override 2198 public RpcControllerFactory getRpcControllerFactory() { 2199 return this.rpcControllerFactory; 2200 } 2201 2202 private static <T> T get(CompletableFuture<T> future) throws IOException { 2203 try { 2204 return future.get(); 2205 } catch (InterruptedException e) { 2206 Thread.currentThread().interrupt(); 2207 throw (IOException) new InterruptedIOException().initCause(e); 2208 } catch (ExecutionException e) { 2209 Throwable cause = e.getCause(); 2210 Throwables.propagateIfPossible(cause, IOException.class); 2211 throw new IOException(cause); 2212 } 2213 } 2214 2215 @Override 2216 public String getClusterId() { 2217 try { 2218 return registry.getClusterId().get(); 2219 } catch (InterruptedException | ExecutionException e) { 2220 LOG.error("Error fetching cluster ID: ", e); 2221 } 2222 return null; 2223 } 2224}