001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS; 022import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS; 023import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 026import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; 027import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 028import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE; 029import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 030import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx; 031 032import edu.umd.cs.findbugs.annotations.Nullable; 033import java.io.Closeable; 034import java.io.IOException; 035import java.io.InterruptedIOException; 036import java.lang.reflect.UndeclaredThrowableException; 037import java.util.ArrayList; 038import java.util.Collections; 039import java.util.Date; 040import java.util.List; 041import java.util.concurrent.BlockingQueue; 042import java.util.concurrent.CompletableFuture; 043import java.util.concurrent.ConcurrentHashMap; 044import java.util.concurrent.ConcurrentMap; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.LinkedBlockingQueue; 048import java.util.concurrent.ThreadPoolExecutor; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.atomic.AtomicInteger; 051import java.util.concurrent.locks.ReentrantLock; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.hbase.AuthUtil; 054import org.apache.hadoop.hbase.CallQueueTooBigException; 055import org.apache.hadoop.hbase.ChoreService; 056import org.apache.hadoop.hbase.DoNotRetryIOException; 057import org.apache.hadoop.hbase.HConstants; 058import org.apache.hadoop.hbase.HRegionLocation; 059import org.apache.hadoop.hbase.MasterNotRunningException; 060import org.apache.hadoop.hbase.MetaTableAccessor; 061import org.apache.hadoop.hbase.RegionLocations; 062import org.apache.hadoop.hbase.ServerName; 063import org.apache.hadoop.hbase.TableName; 064import org.apache.hadoop.hbase.TableNotEnabledException; 065import org.apache.hadoop.hbase.TableNotFoundException; 066import org.apache.hadoop.hbase.ZooKeeperConnectionException; 067import org.apache.hadoop.hbase.client.Scan.ReadType; 068import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 069import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 070import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 071import org.apache.hadoop.hbase.exceptions.RegionMovedException; 072import org.apache.hadoop.hbase.ipc.RpcClient; 073import org.apache.hadoop.hbase.ipc.RpcClientFactory; 074import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 075import org.apache.hadoop.hbase.log.HBaseMarkers; 076import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 077import org.apache.hadoop.hbase.security.User; 078import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 079import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.GetUserPermissionsResponse; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsRequest; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos.HasUserPermissionsResponse; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.BlockingInterface; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 090import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersRequest; 091import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DecommissionRegionServersResponse; 092import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledRequest; 093import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsBalancerEnabledResponse; 094import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledRequest; 095import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsNormalizerEnabledResponse; 096import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledRequest; 097import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse; 098import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersRequest; 099import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListDecommissionedRegionServersResponse; 100import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; 101import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeResponse; 102import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; 103import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; 104import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesRequest; 105import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse; 106import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; 107import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse; 108import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest; 109import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; 116import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; 117import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; 118import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; 119import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; 120import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerRequest; 121import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; 122import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; 123import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; 124import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; 125import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; 126import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; 127import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; 128import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; 129import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; 130import org.apache.hadoop.hbase.util.Bytes; 131import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 132import org.apache.hadoop.hbase.util.ExceptionUtil; 133import org.apache.hadoop.hbase.util.Pair; 134import org.apache.hadoop.hbase.util.ReflectionUtils; 135import org.apache.hadoop.hbase.util.Threads; 136import org.apache.hadoop.ipc.RemoteException; 137import org.apache.hadoop.security.UserGroupInformation; 138import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 139import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 140import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; 141import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 142import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 143import org.apache.yetus.audience.InterfaceAudience; 144import org.apache.zookeeper.KeeperException; 145import org.slf4j.Logger; 146import org.slf4j.LoggerFactory; 147 148/** 149 * Main implementation of {@link Connection} and {@link ClusterConnection} interfaces. 150 * Encapsulates connection to zookeeper and regionservers. 151 */ 152@edu.umd.cs.findbugs.annotations.SuppressWarnings( 153 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", 154 justification="Access to the conncurrent hash map is under a lock so should be fine.") 155@InterfaceAudience.Private 156class ConnectionImplementation implements ClusterConnection, Closeable { 157 public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; 158 private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class); 159 160 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; 161 162 private final boolean hostnamesCanChange; 163 private final long pause; 164 private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified 165 // The mode tells if HedgedRead, LoadBalance mode is supported. 166 // The default mode is CatalogReplicaMode.None. 167 private CatalogReplicaMode metaReplicaMode; 168 private CatalogReplicaLoadBalanceSelector metaReplicaSelector; 169 170 private final int metaReplicaCallTimeoutScanInMicroSecond; 171 private final int numTries; 172 final int rpcTimeout; 173 174 /** 175 * Global nonceGenerator shared per client.Currently there's no reason to limit its scope. 176 * Once it's set under nonceGeneratorCreateLock, it is never unset or changed. 177 */ 178 private static volatile NonceGenerator nonceGenerator = null; 179 /** The nonce generator lock. Only taken when creating Connection, which gets a private copy. */ 180 private static final Object nonceGeneratorCreateLock = new Object(); 181 182 private final AsyncProcess asyncProcess; 183 // single tracker per connection 184 private final ServerStatisticTracker stats; 185 186 private volatile boolean closed; 187 private volatile boolean aborted; 188 189 // package protected for the tests 190 ClusterStatusListener clusterStatusListener; 191 192 private final Object metaRegionLock = new Object(); 193 194 private final Object masterLock = new Object(); 195 196 // thread executor shared by all Table instances created 197 // by this connection 198 private volatile ThreadPoolExecutor batchPool = null; 199 // meta thread executor shared by all Table instances created 200 // by this connection 201 private volatile ThreadPoolExecutor metaLookupPool = null; 202 private volatile boolean cleanupPool = false; 203 204 private final Configuration conf; 205 206 // cache the configuration value for tables so that we can avoid calling 207 // the expensive Configuration to fetch the value multiple times. 208 private final ConnectionConfiguration connectionConfig; 209 210 // Client rpc instance. 211 private final RpcClient rpcClient; 212 213 private final MetaCache metaCache; 214 private final MetricsConnection metrics; 215 216 protected User user; 217 218 private final RpcRetryingCallerFactory rpcCallerFactory; 219 220 private final RpcControllerFactory rpcControllerFactory; 221 222 private final RetryingCallerInterceptor interceptor; 223 224 /** 225 * Cluster registry of basic info such as clusterid and meta region location. 226 */ 227 private final ConnectionRegistry registry; 228 229 private final ClientBackoffPolicy backoffPolicy; 230 231 /** 232 * Allow setting an alternate BufferedMutator implementation via 233 * config. If null, use default. 234 */ 235 private final String alternateBufferedMutatorClassName; 236 237 /** lock guards against multiple threads trying to query the meta region at the same time */ 238 private final ReentrantLock userRegionLock = new ReentrantLock(); 239 240 private ChoreService choreService; 241 242 /** 243 * constructor 244 * @param conf Configuration object 245 */ 246 ConnectionImplementation(Configuration conf, ExecutorService pool, User user) throws IOException { 247 this.conf = conf; 248 this.user = user; 249 if (user != null && user.isLoginFromKeytab()) { 250 spawnRenewalChore(user.getUGI()); 251 } 252 this.batchPool = (ThreadPoolExecutor) pool; 253 this.connectionConfig = new ConnectionConfiguration(conf); 254 this.closed = false; 255 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, 256 HConstants.DEFAULT_HBASE_CLIENT_PAUSE); 257 long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pause); 258 if (configuredPauseForCQTBE < pause) { 259 LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: " 260 + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE 261 + ", will use " + pause + " instead."); 262 this.pauseForCQTBE = pause; 263 } else { 264 this.pauseForCQTBE = configuredPauseForCQTBE; 265 } 266 this.metaReplicaCallTimeoutScanInMicroSecond = 267 connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan(); 268 269 // how many times to try, one more than max *retry* time 270 this.numTries = retries2Attempts(connectionConfig.getRetriesNumber()); 271 this.rpcTimeout = conf.getInt( 272 HConstants.HBASE_RPC_TIMEOUT_KEY, 273 HConstants.DEFAULT_HBASE_RPC_TIMEOUT); 274 if (conf.getBoolean(NonceGenerator.CLIENT_NONCES_ENABLED_KEY, true)) { 275 synchronized (nonceGeneratorCreateLock) { 276 if (nonceGenerator == null) { 277 nonceGenerator = PerClientRandomNonceGenerator.get(); 278 } 279 } 280 } else { 281 nonceGenerator = NO_NONCE_GENERATOR; 282 } 283 284 this.stats = ServerStatisticTracker.create(conf); 285 this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build(); 286 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 287 this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats); 288 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 289 this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, rpcControllerFactory); 290 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 291 this.metrics = 292 new MetricsConnection(this.toString(), this::getBatchPool, this::getMetaLookupPool); 293 } else { 294 this.metrics = null; 295 } 296 this.metaCache = new MetaCache(this.metrics); 297 298 boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED, 299 HConstants.STATUS_PUBLISHED_DEFAULT); 300 this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); 301 Class<? extends ClusterStatusListener.Listener> listenerClass = 302 conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS, 303 ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, 304 ClusterStatusListener.Listener.class); 305 306 // Is there an alternate BufferedMutator to use? 307 this.alternateBufferedMutatorClassName = 308 this.conf.get(BufferedMutator.CLASSNAME_KEY); 309 310 try { 311 this.registry = ConnectionRegistryFactory.getRegistry(conf); 312 retrieveClusterId(); 313 314 this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics); 315 316 // Do we publish the status? 317 if (shouldListen) { 318 if (listenerClass == null) { 319 LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + 320 ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status"); 321 } else { 322 clusterStatusListener = new ClusterStatusListener( 323 new ClusterStatusListener.DeadServerHandler() { 324 @Override 325 public void newDead(ServerName sn) { 326 clearCaches(sn); 327 rpcClient.cancelConnections(sn); 328 } 329 }, conf, listenerClass); 330 } 331 } 332 } catch (Throwable e) { 333 // avoid leaks: registry, rpcClient, ... 334 LOG.debug("connection construction failed", e); 335 close(); 336 throw e; 337 } 338 339 // Get the region locator's meta replica mode. 340 this.metaReplicaMode = CatalogReplicaMode.fromString(conf.get(LOCATOR_META_REPLICAS_MODE, 341 CatalogReplicaMode.NONE.toString())); 342 343 switch (this.metaReplicaMode) { 344 case LOAD_BALANCE: 345 String replicaSelectorClass = conf.get( 346 RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR, 347 CatalogReplicaLoadBalanceSimpleSelector.class.getName()); 348 349 this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector( 350 replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> { 351 int numOfReplicas = 1; 352 try { 353 RegionLocations metaLocations = registry.getMetaRegionLocations().get( 354 connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS); 355 numOfReplicas = metaLocations.size(); 356 } catch (Exception e) { 357 LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e); 358 } 359 return numOfReplicas; 360 }); 361 break; 362 case NONE: 363 // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config. 364 365 boolean useMetaReplicas = conf.getBoolean(USE_META_REPLICAS, 366 DEFAULT_USE_META_REPLICAS); 367 if (useMetaReplicas) { 368 this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ; 369 } 370 break; 371 default: 372 // Doing nothing 373 } 374 } 375 376 private void spawnRenewalChore(final UserGroupInformation user) { 377 ChoreService service = getChoreService(); 378 service.scheduleChore(AuthUtil.getAuthRenewalChore(user)); 379 } 380 381 /** 382 * @param conn The connection for which to replace the generator. 383 * @param cnm Replaces the nonce generator used, for testing. 384 * @return old nonce generator. 385 */ 386 static NonceGenerator injectNonceGeneratorForTesting( 387 ClusterConnection conn, NonceGenerator cnm) { 388 ConnectionImplementation connImpl = (ConnectionImplementation)conn; 389 NonceGenerator ng = connImpl.getNonceGenerator(); 390 LOG.warn("Nonce generator is being replaced by test code for " 391 + cnm.getClass().getName()); 392 nonceGenerator = cnm; 393 return ng; 394 } 395 396 @Override 397 public Table getTable(TableName tableName) throws IOException { 398 return getTable(tableName, getBatchPool()); 399 } 400 401 @Override 402 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 403 return new TableBuilderBase(tableName, connectionConfig) { 404 405 @Override 406 public Table build() { 407 return new HTable(ConnectionImplementation.this, this, rpcCallerFactory, 408 rpcControllerFactory, pool); 409 } 410 }; 411 } 412 413 @Override 414 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) { 415 if (params.getTableName() == null) { 416 throw new IllegalArgumentException("TableName cannot be null."); 417 } 418 if (params.getPool() == null) { 419 params.pool(HTable.getDefaultExecutor(getConfiguration())); 420 } 421 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) { 422 params.writeBufferSize(connectionConfig.getWriteBufferSize()); 423 } 424 if (params.getWriteBufferPeriodicFlushTimeoutMs() == BufferedMutatorParams.UNSET) { 425 params.setWriteBufferPeriodicFlushTimeoutMs( 426 connectionConfig.getWriteBufferPeriodicFlushTimeoutMs()); 427 } 428 if (params.getWriteBufferPeriodicFlushTimerTickMs() == BufferedMutatorParams.UNSET) { 429 params.setWriteBufferPeriodicFlushTimerTickMs( 430 connectionConfig.getWriteBufferPeriodicFlushTimerTickMs()); 431 } 432 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { 433 params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); 434 } 435 // Look to see if an alternate BufferedMutation implementation is wanted. 436 // Look in params and in config. If null, use default. 437 String implementationClassName = params.getImplementationClassName(); 438 if (implementationClassName == null) { 439 implementationClassName = this.alternateBufferedMutatorClassName; 440 } 441 if (implementationClassName == null) { 442 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); 443 } 444 try { 445 return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName), 446 this, rpcCallerFactory, rpcControllerFactory, params); 447 } catch (ClassNotFoundException e) { 448 throw new RuntimeException(e); 449 } 450 } 451 452 @Override 453 public BufferedMutator getBufferedMutator(TableName tableName) { 454 return getBufferedMutator(new BufferedMutatorParams(tableName)); 455 } 456 457 @Override 458 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 459 return new HRegionLocator(tableName, this); 460 } 461 462 @Override 463 public Admin getAdmin() throws IOException { 464 return new HBaseAdmin(this); 465 } 466 467 @Override 468 public Hbck getHbck() throws IOException { 469 return getHbck(get(registry.getActiveMaster())); 470 } 471 472 @Override 473 public Hbck getHbck(ServerName masterServer) throws IOException { 474 checkClosed(); 475 if (isDeadServer(masterServer)) { 476 throw new RegionServerStoppedException(masterServer + " is dead."); 477 } 478 String key = getStubKey(MasterProtos.HbckService.BlockingInterface.class.getName(), 479 masterServer, this.hostnamesCanChange); 480 481 return new HBaseHbck( 482 (MasterProtos.HbckService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 483 BlockingRpcChannel channel = 484 this.rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout); 485 return MasterProtos.HbckService.newBlockingStub(channel); 486 }), rpcControllerFactory); 487 } 488 489 @Override 490 public MetricsConnection getConnectionMetrics() { 491 return this.metrics; 492 } 493 494 private ThreadPoolExecutor getBatchPool() { 495 if (batchPool == null) { 496 synchronized (this) { 497 if (batchPool == null) { 498 int threads = conf.getInt("hbase.hconnection.threads.max", 256); 499 this.batchPool = getThreadPool(threads, threads, "-shared", null); 500 this.cleanupPool = true; 501 } 502 } 503 } 504 return this.batchPool; 505 } 506 507 private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, String nameHint, 508 BlockingQueue<Runnable> passedWorkQueue) { 509 // shared HTable thread executor not yet initialized 510 if (maxThreads == 0) { 511 maxThreads = Runtime.getRuntime().availableProcessors() * 8; 512 } 513 if (coreThreads == 0) { 514 coreThreads = Runtime.getRuntime().availableProcessors() * 8; 515 } 516 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60); 517 BlockingQueue<Runnable> workQueue = passedWorkQueue; 518 if (workQueue == null) { 519 workQueue = 520 new LinkedBlockingQueue<>(maxThreads * 521 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, 522 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); 523 coreThreads = maxThreads; 524 } 525 ThreadPoolExecutor tpe = 526 new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, 527 new ThreadFactoryBuilder().setNameFormat(toString() + nameHint + "-pool-%d") 528 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 529 tpe.allowCoreThreadTimeOut(true); 530 return tpe; 531 } 532 533 private ThreadPoolExecutor getMetaLookupPool() { 534 if (this.metaLookupPool == null) { 535 synchronized (this) { 536 if (this.metaLookupPool == null) { 537 //Some of the threads would be used for meta replicas 538 //To start with, threads.max.core threads can hit the meta (including replicas). 539 //After that, requests will get queued up in the passed queue, and only after 540 //the queue is full, a new thread will be started 541 int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128); 542 this.metaLookupPool = getThreadPool( 543 threads, 544 threads, 545 "-metaLookup-shared-", new LinkedBlockingQueue<>()); 546 } 547 } 548 } 549 return this.metaLookupPool; 550 } 551 552 protected ExecutorService getCurrentMetaLookupPool() { 553 return metaLookupPool; 554 } 555 556 protected ExecutorService getCurrentBatchPool() { 557 return batchPool; 558 } 559 560 private void shutdownPools() { 561 if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) { 562 shutdownBatchPool(this.batchPool); 563 } 564 if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) { 565 shutdownBatchPool(this.metaLookupPool); 566 } 567 } 568 569 private void shutdownBatchPool(ExecutorService pool) { 570 pool.shutdown(); 571 try { 572 if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { 573 pool.shutdownNow(); 574 } 575 } catch (InterruptedException e) { 576 pool.shutdownNow(); 577 } 578 } 579 580 /** 581 * For tests only. 582 */ 583 RpcClient getRpcClient() { 584 return rpcClient; 585 } 586 587 /** 588 * An identifier that will remain the same for a given connection. 589 */ 590 @Override 591 public String toString(){ 592 return "hconnection-0x" + Integer.toHexString(hashCode()); 593 } 594 595 protected String clusterId = null; 596 597 protected void retrieveClusterId() { 598 if (clusterId != null) { 599 return; 600 } 601 try { 602 this.clusterId = this.registry.getClusterId().get(); 603 } catch (InterruptedException | ExecutionException e) { 604 LOG.warn("Retrieve cluster id failed", e); 605 } 606 if (clusterId == null) { 607 clusterId = HConstants.CLUSTER_ID_DEFAULT; 608 LOG.debug("clusterid came back null, using default " + clusterId); 609 } 610 } 611 612 /** 613 * If choreService has not been created yet, create the ChoreService. 614 * @return ChoreService 615 */ 616 synchronized ChoreService getChoreService() { 617 if (choreService == null) { 618 choreService = new ChoreService("AsyncConn Chore Service"); 619 } 620 return choreService; 621 } 622 623 @Override 624 public Configuration getConfiguration() { 625 return this.conf; 626 } 627 628 private void checkClosed() throws DoNotRetryIOException { 629 if (this.closed) { 630 throw new DoNotRetryIOException(toString() + " closed"); 631 } 632 } 633 634 /** 635 * @return true if the master is running, throws an exception otherwise 636 * @throws org.apache.hadoop.hbase.MasterNotRunningException - if the master is not running 637 * @deprecated this has been deprecated without a replacement 638 */ 639 @Deprecated 640 @Override 641 public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { 642 // When getting the master connection, we check it's running, 643 // so if there is no exception, it means we've been able to get a 644 // connection on a running master 645 MasterKeepAliveConnection m; 646 try { 647 m = getKeepAliveMasterService(); 648 } catch (IOException e) { 649 throw new MasterNotRunningException(e); 650 } 651 m.close(); 652 return true; 653 } 654 655 @Override 656 public HRegionLocation getRegionLocation(final TableName tableName, final byte[] row, 657 boolean reload) throws IOException { 658 return reload ? relocateRegion(tableName, row) : locateRegion(tableName, row); 659 } 660 661 662 @Override 663 public boolean isTableEnabled(TableName tableName) throws IOException { 664 return getTableState(tableName).inStates(TableState.State.ENABLED); 665 } 666 667 @Override 668 public boolean isTableDisabled(TableName tableName) throws IOException { 669 return getTableState(tableName).inStates(TableState.State.DISABLED); 670 } 671 672 @Override 673 public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys) 674 throws IOException { 675 checkClosed(); 676 try { 677 if (!isTableEnabled(tableName)) { 678 LOG.debug("Table {} not enabled", tableName); 679 return false; 680 } 681 if (TableName.isMetaTableName(tableName)) { 682 // meta table is always available 683 return true; 684 } 685 List<Pair<RegionInfo, ServerName>> locations = 686 MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true); 687 688 int notDeployed = 0; 689 int regionCount = 0; 690 for (Pair<RegionInfo, ServerName> pair : locations) { 691 RegionInfo info = pair.getFirst(); 692 if (pair.getSecond() == null) { 693 LOG.debug("Table {} has not deployed region {}", tableName, 694 pair.getFirst().getEncodedName()); 695 notDeployed++; 696 } else if (splitKeys != null 697 && !Bytes.equals(info.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 698 for (byte[] splitKey : splitKeys) { 699 // Just check if the splitkey is available 700 if (Bytes.equals(info.getStartKey(), splitKey)) { 701 regionCount++; 702 break; 703 } 704 } 705 } else { 706 // Always empty start row should be counted 707 regionCount++; 708 } 709 } 710 if (notDeployed > 0) { 711 if (LOG.isDebugEnabled()) { 712 LOG.debug("Table {} has {} regions not deployed", tableName, notDeployed); 713 } 714 return false; 715 } else if (splitKeys != null && regionCount != splitKeys.length + 1) { 716 if (LOG.isDebugEnabled()) { 717 LOG.debug("Table {} expected to have {} regions, but only {} available", tableName, 718 splitKeys.length + 1, regionCount); 719 } 720 return false; 721 } else { 722 LOG.trace("Table {} should be available", tableName); 723 return true; 724 } 725 } catch (TableNotFoundException tnfe) { 726 LOG.warn("Table {} does not exist", tableName); 727 return false; 728 } 729 } 730 731 @Override 732 public HRegionLocation locateRegion(final byte[] regionName) throws IOException { 733 RegionLocations locations = locateRegion(RegionInfo.getTable(regionName), 734 RegionInfo.getStartKey(regionName), false, true); 735 return locations == null ? null : locations.getRegionLocation(); 736 } 737 738 private boolean isDeadServer(ServerName sn) { 739 if (clusterStatusListener == null) { 740 return false; 741 } else { 742 return clusterStatusListener.isDeadServer(sn); 743 } 744 } 745 746 @Override 747 public List<HRegionLocation> locateRegions(TableName tableName) throws IOException { 748 return locateRegions(tableName, false, true); 749 } 750 751 @Override 752 public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache, 753 boolean offlined) throws IOException { 754 List<RegionInfo> regions; 755 if (TableName.isMetaTableName(tableName)) { 756 regions = Collections.singletonList(RegionInfoBuilder.FIRST_META_REGIONINFO); 757 } else { 758 regions = MetaTableAccessor.getTableRegions(this, tableName, !offlined); 759 } 760 List<HRegionLocation> locations = new ArrayList<>(); 761 for (RegionInfo regionInfo : regions) { 762 if (!RegionReplicaUtil.isDefaultReplica(regionInfo)) { 763 continue; 764 } 765 RegionLocations list = locateRegion(tableName, regionInfo.getStartKey(), useCache, true); 766 if (list != null) { 767 for (HRegionLocation loc : list.getRegionLocations()) { 768 if (loc != null) { 769 locations.add(loc); 770 } 771 } 772 } 773 } 774 return locations; 775 } 776 777 @Override 778 public HRegionLocation locateRegion(final TableName tableName, final byte[] row) 779 throws IOException { 780 RegionLocations locations = locateRegion(tableName, row, true, true); 781 return locations == null ? null : locations.getRegionLocation(); 782 } 783 784 @Override 785 public HRegionLocation relocateRegion(final TableName tableName, final byte[] row) 786 throws IOException { 787 RegionLocations locations = 788 relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID); 789 return locations == null ? null 790 : locations.getRegionLocation(RegionReplicaUtil.DEFAULT_REPLICA_ID); 791 } 792 793 @Override 794 public RegionLocations relocateRegion(final TableName tableName, 795 final byte [] row, int replicaId) throws IOException{ 796 // Since this is an explicit request not to use any caching, finding 797 // disabled tables should not be desirable. This will ensure that an exception is thrown when 798 // the first time a disabled table is interacted with. 799 if (!tableName.equals(TableName.META_TABLE_NAME) && isTableDisabled(tableName)) { 800 throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); 801 } 802 803 return locateRegion(tableName, row, false, true, replicaId); 804 } 805 806 @Override 807 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 808 boolean retry) throws IOException { 809 return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID); 810 } 811 812 @Override 813 public RegionLocations locateRegion(final TableName tableName, final byte[] row, boolean useCache, 814 boolean retry, int replicaId) throws IOException { 815 checkClosed(); 816 if (tableName == null || tableName.getName().length == 0) { 817 throw new IllegalArgumentException("table name cannot be null or zero length"); 818 } 819 if (tableName.equals(TableName.META_TABLE_NAME)) { 820 return locateMeta(tableName, useCache, replicaId); 821 } else { 822 // Region not in the cache - have to go to the meta RS 823 return locateRegionInMeta(tableName, row, useCache, retry, replicaId); 824 } 825 } 826 827 private RegionLocations locateMeta(final TableName tableName, 828 boolean useCache, int replicaId) throws IOException { 829 // HBASE-10785: We cache the location of the META itself, so that we are not overloading 830 // zookeeper with one request for every region lookup. We cache the META with empty row 831 // key in MetaCache. 832 byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta 833 RegionLocations locations = null; 834 if (useCache) { 835 locations = getCachedLocation(tableName, metaCacheKey); 836 if (locations != null && locations.getRegionLocation(replicaId) != null) { 837 return locations; 838 } 839 } 840 841 // only one thread should do the lookup. 842 synchronized (metaRegionLock) { 843 // Check the cache again for a hit in case some other thread made the 844 // same query while we were waiting on the lock. 845 if (useCache) { 846 locations = getCachedLocation(tableName, metaCacheKey); 847 if (locations != null && locations.getRegionLocation(replicaId) != null) { 848 return locations; 849 } 850 } 851 852 // Look up from zookeeper 853 locations = get(this.registry.getMetaRegionLocations()); 854 if (locations != null) { 855 cacheLocation(tableName, locations); 856 } 857 } 858 return locations; 859 } 860 861 /** 862 * Search the hbase:meta table for the HRegionLocation info that contains the table and row we're 863 * seeking. 864 */ 865 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, 866 boolean retry, int replicaId) throws IOException { 867 // If we are supposed to be using the cache, look in the cache to see if we already have the 868 // region. 869 if (useCache) { 870 RegionLocations locations = getCachedLocation(tableName, row); 871 if (locations != null && locations.getRegionLocation(replicaId) != null) { 872 return locations; 873 } 874 } 875 // build the key of the meta region we should be looking for. 876 // the extra 9's on the end are necessary to allow "exact" matches 877 // without knowing the precise region names. 878 byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); 879 byte[] metaStopKey = 880 RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); 881 Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true) 882 .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5) 883 .setReadType(ReadType.PREAD); 884 885 switch (this.metaReplicaMode) { 886 case LOAD_BALANCE: 887 int metaReplicaId = this.metaReplicaSelector.select(tableName, row, 888 RegionLocateType.CURRENT); 889 if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) { 890 // If the selector gives a non-primary meta replica region, then go with it. 891 // Otherwise, just go to primary in non-hedgedRead mode. 892 s.setConsistency(Consistency.TIMELINE); 893 s.setReplicaId(metaReplicaId); 894 } 895 break; 896 case HEDGED_READ: 897 s.setConsistency(Consistency.TIMELINE); 898 break; 899 default: 900 // do nothing 901 } 902 int maxAttempts = (retry ? numTries : 1); 903 boolean relocateMeta = false; 904 for (int tries = 0; ; tries++) { 905 if (tries >= maxAttempts) { 906 throw new NoServerForRegionException("Unable to find region for " 907 + Bytes.toStringBinary(row) + " in " + tableName + " after " + tries + " tries."); 908 } 909 if (useCache) { 910 RegionLocations locations = getCachedLocation(tableName, row); 911 if (locations != null && locations.getRegionLocation(replicaId) != null) { 912 return locations; 913 } 914 } else { 915 // If we are not supposed to be using the cache, delete any existing cached location 916 // so it won't interfere. 917 // We are only supposed to clean the cache for the specific replicaId 918 metaCache.clearCache(tableName, row, replicaId); 919 } 920 // Query the meta region 921 long pauseBase = this.pause; 922 takeUserRegionLock(); 923 try { 924 // We don't need to check if useCache is enabled or not. Even if useCache is false 925 // we already cleared the cache for this row before acquiring userRegion lock so if this 926 // row is present in cache that means some other thread has populated it while we were 927 // waiting to acquire user region lock. 928 RegionLocations locations = getCachedLocation(tableName, row); 929 if (locations != null && locations.getRegionLocation(replicaId) != null) { 930 return locations; 931 } 932 if (relocateMeta) { 933 relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, 934 RegionInfo.DEFAULT_REPLICA_ID); 935 } 936 s.resetMvccReadPoint(); 937 try (ReversedClientScanner rcs = 938 new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, 939 rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) { 940 boolean tableNotFound = true; 941 for (;;) { 942 Result regionInfoRow = rcs.next(); 943 if (regionInfoRow == null) { 944 if (tableNotFound) { 945 throw new TableNotFoundException(tableName); 946 } else { 947 throw new IOException( 948 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 949 } 950 } 951 tableNotFound = false; 952 // convert the row result into the HRegionLocation we need! 953 locations = MetaTableAccessor.getRegionLocations(regionInfoRow); 954 if (locations == null || locations.getRegionLocation(replicaId) == null) { 955 throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow); 956 } 957 RegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegion(); 958 if (regionInfo == null) { 959 throw new IOException("RegionInfo null or empty in " + TableName.META_TABLE_NAME + 960 ", row=" + regionInfoRow); 961 } 962 // See HBASE-20182. It is possible that we locate to a split parent even after the 963 // children are online, so here we need to skip this region and go to the next one. 964 if (regionInfo.isSplitParent()) { 965 continue; 966 } 967 if (regionInfo.isOffline()) { 968 throw new RegionOfflineException("Region offline; disable table call? " + 969 regionInfo.getRegionNameAsString()); 970 } 971 // It is possible that the split children have not been online yet and we have skipped 972 // the parent in the above condition, so we may have already reached a region which does 973 // not contains us. 974 if (!regionInfo.containsRow(row)) { 975 throw new IOException( 976 "Unable to find region for " + Bytes.toStringBinary(row) + " in " + tableName); 977 } 978 ServerName serverName = locations.getRegionLocation(replicaId).getServerName(); 979 if (serverName == null) { 980 throw new NoServerForRegionException("No server address listed in " + 981 TableName.META_TABLE_NAME + " for region " + regionInfo.getRegionNameAsString() + 982 " containing row " + Bytes.toStringBinary(row)); 983 } 984 if (isDeadServer(serverName)) { 985 throw new RegionServerStoppedException( 986 "hbase:meta says the region " + regionInfo.getRegionNameAsString() + 987 " is managed by the server " + serverName + ", but it is dead."); 988 } 989 // Instantiate the location 990 cacheLocation(tableName, locations); 991 return locations; 992 } 993 } 994 } catch (TableNotFoundException e) { 995 // if we got this error, probably means the table just plain doesn't 996 // exist. rethrow the error immediately. this should always be coming 997 // from the HTable constructor. 998 throw e; 999 } catch (IOException e) { 1000 ExceptionUtil.rethrowIfInterrupt(e); 1001 if (e instanceof RemoteException) { 1002 e = ((RemoteException)e).unwrapRemoteException(); 1003 } 1004 if (e instanceof CallQueueTooBigException) { 1005 // Give a special check on CallQueueTooBigException, see #HBASE-17114 1006 pauseBase = this.pauseForCQTBE; 1007 } 1008 if (tries < maxAttempts - 1) { 1009 LOG.debug("locateRegionInMeta parentTable='{}', attempt={} of {} failed; retrying " + 1010 "after sleep of {}", TableName.META_TABLE_NAME, tries, maxAttempts, maxAttempts, e); 1011 } else { 1012 throw e; 1013 } 1014 // Only relocate the parent region if necessary 1015 relocateMeta = 1016 !(e instanceof RegionOfflineException || e instanceof NoServerForRegionException); 1017 } finally { 1018 userRegionLock.unlock(); 1019 } 1020 try{ 1021 Thread.sleep(ConnectionUtils.getPauseTime(pauseBase, tries)); 1022 } catch (InterruptedException e) { 1023 throw new InterruptedIOException("Giving up trying to location region in " + 1024 "meta: thread is interrupted."); 1025 } 1026 } 1027 } 1028 1029 void takeUserRegionLock() throws IOException { 1030 try { 1031 long waitTime = connectionConfig.getMetaOperationTimeout(); 1032 if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { 1033 throw new LockTimeoutException("Failed to get user region lock in" 1034 + waitTime + " ms. " + " for accessing meta region server."); 1035 } 1036 } catch (InterruptedException ie) { 1037 LOG.error("Interrupted while waiting for a lock", ie); 1038 throw ExceptionUtil.asInterrupt(ie); 1039 } 1040 } 1041 1042 /** 1043 * Put a newly discovered HRegionLocation into the cache. 1044 * @param tableName The table name. 1045 * @param location the new location 1046 */ 1047 @Override 1048 public void cacheLocation(final TableName tableName, final RegionLocations location) { 1049 metaCache.cacheLocation(tableName, location); 1050 } 1051 1052 /** 1053 * Search the cache for a location that fits our table and row key. 1054 * Return null if no suitable region is located. 1055 * @return Null or region location found in cache. 1056 */ 1057 RegionLocations getCachedLocation(final TableName tableName, 1058 final byte [] row) { 1059 return metaCache.getCachedLocation(tableName, row); 1060 } 1061 1062 public void clearRegionCache(final TableName tableName, byte[] row) { 1063 metaCache.clearCache(tableName, row); 1064 } 1065 1066 /* 1067 * Delete all cached entries of a table that maps to a specific location. 1068 */ 1069 @Override 1070 public void clearCaches(final ServerName serverName) { 1071 metaCache.clearCache(serverName); 1072 } 1073 1074 @Override 1075 public void clearRegionLocationCache() { 1076 metaCache.clearCache(); 1077 } 1078 1079 @Override 1080 public void clearRegionCache(final TableName tableName) { 1081 metaCache.clearCache(tableName); 1082 } 1083 1084 /** 1085 * Put a newly discovered HRegionLocation into the cache. 1086 * @param tableName The table name. 1087 * @param source the source of the new location, if it's not coming from meta 1088 * @param location the new location 1089 */ 1090 private void cacheLocation(final TableName tableName, final ServerName source, 1091 final HRegionLocation location) { 1092 metaCache.cacheLocation(tableName, source, location); 1093 } 1094 1095 // Map keyed by service name + regionserver to service stub implementation 1096 private final ConcurrentMap<String, Object> stubs = new ConcurrentHashMap<>(); 1097 1098 /** 1099 * State of the MasterService connection/setup. 1100 */ 1101 static class MasterServiceState { 1102 Connection connection; 1103 1104 MasterProtos.MasterService.BlockingInterface stub; 1105 int userCount; 1106 1107 MasterServiceState(final Connection connection) { 1108 super(); 1109 this.connection = connection; 1110 } 1111 1112 @Override 1113 public String toString() { 1114 return "MasterService"; 1115 } 1116 1117 Object getStub() { 1118 return this.stub; 1119 } 1120 1121 void clearStub() { 1122 this.stub = null; 1123 } 1124 1125 boolean isMasterRunning() throws IOException { 1126 MasterProtos.IsMasterRunningResponse response = null; 1127 try { 1128 response = this.stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1129 } catch (Exception e) { 1130 throw ProtobufUtil.handleRemoteException(e); 1131 } 1132 return response != null? response.getIsMasterRunning(): false; 1133 } 1134 } 1135 1136 /** 1137 * The record of errors for servers. 1138 */ 1139 static class ServerErrorTracker { 1140 // We need a concurrent map here, as we could have multiple threads updating it in parallel. 1141 private final ConcurrentMap<ServerName, ServerErrors> errorsByServer = new ConcurrentHashMap<>(); 1142 private final long canRetryUntil; 1143 private final int maxTries;// max number to try 1144 private final long startTrackingTime; 1145 1146 /** 1147 * Constructor 1148 * @param timeout how long to wait before timeout, in unit of millisecond 1149 * @param maxTries how many times to try 1150 */ 1151 public ServerErrorTracker(long timeout, int maxTries) { 1152 this.maxTries = maxTries; 1153 this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout; 1154 this.startTrackingTime = new Date().getTime(); 1155 } 1156 1157 /** 1158 * We stop to retry when we have exhausted BOTH the number of tries and the time allocated. 1159 * @param numAttempt how many times we have tried by now 1160 */ 1161 boolean canTryMore(int numAttempt) { 1162 // If there is a single try we must not take into account the time. 1163 return numAttempt < maxTries || (maxTries > 1 && 1164 EnvironmentEdgeManager.currentTime() < this.canRetryUntil); 1165 } 1166 1167 /** 1168 * Calculates the back-off time for a retrying request to a particular server. 1169 * 1170 * @param server The server in question. 1171 * @param basePause The default hci pause. 1172 * @return The time to wait before sending next request. 1173 */ 1174 long calculateBackoffTime(ServerName server, long basePause) { 1175 long result; 1176 ServerErrors errorStats = errorsByServer.get(server); 1177 if (errorStats != null) { 1178 result = ConnectionUtils.getPauseTime(basePause, Math.max(0, errorStats.getCount() - 1)); 1179 } else { 1180 result = 0; // yes, if the server is not in our list we don't wait before retrying. 1181 } 1182 return result; 1183 } 1184 1185 /** 1186 * Reports that there was an error on the server to do whatever bean-counting necessary. 1187 * @param server The server in question. 1188 */ 1189 void reportServerError(ServerName server) { 1190 computeIfAbsent(errorsByServer, server, ServerErrors::new).addError(); 1191 } 1192 1193 long getStartTrackingTime() { 1194 return startTrackingTime; 1195 } 1196 1197 /** 1198 * The record of errors for a server. 1199 */ 1200 private static class ServerErrors { 1201 private final AtomicInteger retries = new AtomicInteger(0); 1202 1203 public int getCount() { 1204 return retries.get(); 1205 } 1206 1207 public void addError() { 1208 retries.incrementAndGet(); 1209 } 1210 } 1211 } 1212 1213 /** 1214 * Class to make a MasterServiceStubMaker stub. 1215 */ 1216 private final class MasterServiceStubMaker { 1217 1218 private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub) 1219 throws IOException { 1220 try { 1221 stub.isMasterRunning(null, RequestConverter.buildIsMasterRunningRequest()); 1222 } catch (ServiceException e) { 1223 throw ProtobufUtil.handleRemoteException(e); 1224 } 1225 } 1226 1227 /** 1228 * Create a stub. Try once only. It is not typed because there is no common type to protobuf 1229 * services nor their interfaces. Let the caller do appropriate casting. 1230 * @return A stub for master services. 1231 */ 1232 private MasterProtos.MasterService.BlockingInterface makeStubNoRetries() 1233 throws IOException, KeeperException { 1234 ServerName sn = get(registry.getActiveMaster()); 1235 if (sn == null) { 1236 String msg = "ZooKeeper available but no active master location found"; 1237 LOG.info(msg); 1238 throw new MasterNotRunningException(msg); 1239 } 1240 if (isDeadServer(sn)) { 1241 throw new MasterNotRunningException(sn + " is dead."); 1242 } 1243 // Use the security info interface name as our stub key 1244 String key = 1245 getStubKey(MasterProtos.MasterService.getDescriptor().getName(), sn, hostnamesCanChange); 1246 MasterProtos.MasterService.BlockingInterface stub = 1247 (MasterProtos.MasterService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1248 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, user, rpcTimeout); 1249 return MasterProtos.MasterService.newBlockingStub(channel); 1250 }); 1251 isMasterRunning(stub); 1252 return stub; 1253 } 1254 1255 /** 1256 * Create a stub against the master. Retry if necessary. 1257 * @return A stub to do <code>intf</code> against the master 1258 * @throws org.apache.hadoop.hbase.MasterNotRunningException if master is not running 1259 */ 1260 MasterProtos.MasterService.BlockingInterface makeStub() throws IOException { 1261 // The lock must be at the beginning to prevent multiple master creations 1262 // (and leaks) in a multithread context 1263 synchronized (masterLock) { 1264 Exception exceptionCaught = null; 1265 if (!closed) { 1266 try { 1267 return makeStubNoRetries(); 1268 } catch (IOException e) { 1269 exceptionCaught = e; 1270 } catch (KeeperException e) { 1271 exceptionCaught = e; 1272 } 1273 throw new MasterNotRunningException(exceptionCaught); 1274 } else { 1275 throw new DoNotRetryIOException("Connection was closed while trying to get master"); 1276 } 1277 } 1278 } 1279 } 1280 1281 @Override 1282 public AdminProtos.AdminService.BlockingInterface getAdminForMaster() throws IOException { 1283 return getAdmin(get(registry.getActiveMaster())); 1284 } 1285 1286 @Override 1287 public AdminProtos.AdminService.BlockingInterface getAdmin(ServerName serverName) 1288 throws IOException { 1289 checkClosed(); 1290 if (isDeadServer(serverName)) { 1291 throw new RegionServerStoppedException(serverName + " is dead."); 1292 } 1293 String key = getStubKey(AdminProtos.AdminService.BlockingInterface.class.getName(), serverName, 1294 this.hostnamesCanChange); 1295 return (AdminProtos.AdminService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1296 BlockingRpcChannel channel = 1297 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1298 return AdminProtos.AdminService.newBlockingStub(channel); 1299 }); 1300 } 1301 1302 @Override 1303 public BlockingInterface getClient(ServerName serverName) throws IOException { 1304 checkClosed(); 1305 if (isDeadServer(serverName)) { 1306 throw new RegionServerStoppedException(serverName + " is dead."); 1307 } 1308 String key = getStubKey(ClientProtos.ClientService.BlockingInterface.class.getName(), 1309 serverName, this.hostnamesCanChange); 1310 return (ClientProtos.ClientService.BlockingInterface) computeIfAbsentEx(stubs, key, () -> { 1311 BlockingRpcChannel channel = 1312 this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); 1313 return ClientProtos.ClientService.newBlockingStub(channel); 1314 }); 1315 } 1316 1317 final MasterServiceState masterServiceState = new MasterServiceState(this); 1318 1319 @Override 1320 public MasterKeepAliveConnection getMaster() throws IOException { 1321 return getKeepAliveMasterService(); 1322 } 1323 1324 private void resetMasterServiceState(final MasterServiceState mss) { 1325 mss.userCount++; 1326 } 1327 1328 private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException { 1329 synchronized (masterLock) { 1330 if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) { 1331 MasterServiceStubMaker stubMaker = new MasterServiceStubMaker(); 1332 this.masterServiceState.stub = stubMaker.makeStub(); 1333 } 1334 resetMasterServiceState(this.masterServiceState); 1335 } 1336 // Ugly delegation just so we can add in a Close method. 1337 final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub; 1338 return new MasterKeepAliveConnection() { 1339 MasterServiceState mss = masterServiceState; 1340 1341 @Override 1342 public MasterProtos.AbortProcedureResponse abortProcedure( 1343 RpcController controller, 1344 MasterProtos.AbortProcedureRequest request) throws ServiceException { 1345 return stub.abortProcedure(controller, request); 1346 } 1347 1348 @Override 1349 public MasterProtos.GetProceduresResponse getProcedures( 1350 RpcController controller, 1351 MasterProtos.GetProceduresRequest request) throws ServiceException { 1352 return stub.getProcedures(controller, request); 1353 } 1354 1355 @Override 1356 public MasterProtos.GetLocksResponse getLocks( 1357 RpcController controller, 1358 MasterProtos.GetLocksRequest request) throws ServiceException { 1359 return stub.getLocks(controller, request); 1360 } 1361 1362 @Override 1363 public MasterProtos.AddColumnResponse addColumn( 1364 RpcController controller, 1365 MasterProtos.AddColumnRequest request) throws ServiceException { 1366 return stub.addColumn(controller, request); 1367 } 1368 1369 @Override 1370 public MasterProtos.DeleteColumnResponse deleteColumn(RpcController controller, 1371 MasterProtos.DeleteColumnRequest request) 1372 throws ServiceException { 1373 return stub.deleteColumn(controller, request); 1374 } 1375 1376 @Override 1377 public MasterProtos.ModifyColumnResponse modifyColumn(RpcController controller, 1378 MasterProtos.ModifyColumnRequest request) 1379 throws ServiceException { 1380 return stub.modifyColumn(controller, request); 1381 } 1382 1383 @Override 1384 public MasterProtos.MoveRegionResponse moveRegion(RpcController controller, 1385 MasterProtos.MoveRegionRequest request) throws ServiceException { 1386 return stub.moveRegion(controller, request); 1387 } 1388 1389 @Override 1390 public MasterProtos.MergeTableRegionsResponse mergeTableRegions( 1391 RpcController controller, MasterProtos.MergeTableRegionsRequest request) 1392 throws ServiceException { 1393 return stub.mergeTableRegions(controller, request); 1394 } 1395 1396 @Override 1397 public MasterProtos.AssignRegionResponse assignRegion(RpcController controller, 1398 MasterProtos.AssignRegionRequest request) throws ServiceException { 1399 return stub.assignRegion(controller, request); 1400 } 1401 1402 @Override 1403 public MasterProtos.UnassignRegionResponse unassignRegion(RpcController controller, 1404 MasterProtos.UnassignRegionRequest request) throws ServiceException { 1405 return stub.unassignRegion(controller, request); 1406 } 1407 1408 @Override 1409 public MasterProtos.OfflineRegionResponse offlineRegion(RpcController controller, 1410 MasterProtos.OfflineRegionRequest request) throws ServiceException { 1411 return stub.offlineRegion(controller, request); 1412 } 1413 1414 @Override 1415 public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controller, 1416 MasterProtos.SplitTableRegionRequest request) throws ServiceException { 1417 return stub.splitRegion(controller, request); 1418 } 1419 1420 @Override 1421 public MasterProtos.DeleteTableResponse deleteTable(RpcController controller, 1422 MasterProtos.DeleteTableRequest request) throws ServiceException { 1423 return stub.deleteTable(controller, request); 1424 } 1425 1426 @Override 1427 public MasterProtos.TruncateTableResponse truncateTable(RpcController controller, 1428 MasterProtos.TruncateTableRequest request) throws ServiceException { 1429 return stub.truncateTable(controller, request); 1430 } 1431 1432 @Override 1433 public MasterProtos.EnableTableResponse enableTable(RpcController controller, 1434 MasterProtos.EnableTableRequest request) throws ServiceException { 1435 return stub.enableTable(controller, request); 1436 } 1437 1438 @Override 1439 public MasterProtos.DisableTableResponse disableTable(RpcController controller, 1440 MasterProtos.DisableTableRequest request) throws ServiceException { 1441 return stub.disableTable(controller, request); 1442 } 1443 1444 @Override 1445 public MasterProtos.ModifyTableResponse modifyTable(RpcController controller, 1446 MasterProtos.ModifyTableRequest request) throws ServiceException { 1447 return stub.modifyTable(controller, request); 1448 } 1449 1450 @Override 1451 public MasterProtos.CreateTableResponse createTable(RpcController controller, 1452 MasterProtos.CreateTableRequest request) throws ServiceException { 1453 return stub.createTable(controller, request); 1454 } 1455 1456 @Override 1457 public MasterProtos.ShutdownResponse shutdown(RpcController controller, 1458 MasterProtos.ShutdownRequest request) throws ServiceException { 1459 return stub.shutdown(controller, request); 1460 } 1461 1462 @Override 1463 public MasterProtos.StopMasterResponse stopMaster(RpcController controller, 1464 MasterProtos.StopMasterRequest request) throws ServiceException { 1465 return stub.stopMaster(controller, request); 1466 } 1467 1468 @Override 1469 public MasterProtos.IsInMaintenanceModeResponse isMasterInMaintenanceMode( 1470 final RpcController controller, 1471 final MasterProtos.IsInMaintenanceModeRequest request) throws ServiceException { 1472 return stub.isMasterInMaintenanceMode(controller, request); 1473 } 1474 1475 @Override 1476 public MasterProtos.BalanceResponse balance(RpcController controller, 1477 MasterProtos.BalanceRequest request) throws ServiceException { 1478 return stub.balance(controller, request); 1479 } 1480 1481 @Override 1482 public MasterProtos.SetBalancerRunningResponse setBalancerRunning( 1483 RpcController controller, MasterProtos.SetBalancerRunningRequest request) 1484 throws ServiceException { 1485 return stub.setBalancerRunning(controller, request); 1486 } 1487 1488 @Override 1489 public NormalizeResponse normalize(RpcController controller, 1490 NormalizeRequest request) throws ServiceException { 1491 return stub.normalize(controller, request); 1492 } 1493 1494 @Override 1495 public SetNormalizerRunningResponse setNormalizerRunning( 1496 RpcController controller, SetNormalizerRunningRequest request) 1497 throws ServiceException { 1498 return stub.setNormalizerRunning(controller, request); 1499 } 1500 1501 @Override 1502 public MasterProtos.RunCatalogScanResponse runCatalogScan(RpcController controller, 1503 MasterProtos.RunCatalogScanRequest request) throws ServiceException { 1504 return stub.runCatalogScan(controller, request); 1505 } 1506 1507 @Override 1508 public MasterProtos.EnableCatalogJanitorResponse enableCatalogJanitor( 1509 RpcController controller, MasterProtos.EnableCatalogJanitorRequest request) 1510 throws ServiceException { 1511 return stub.enableCatalogJanitor(controller, request); 1512 } 1513 1514 @Override 1515 public MasterProtos.IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled( 1516 RpcController controller, MasterProtos.IsCatalogJanitorEnabledRequest request) 1517 throws ServiceException { 1518 return stub.isCatalogJanitorEnabled(controller, request); 1519 } 1520 1521 @Override 1522 public MasterProtos.RunCleanerChoreResponse runCleanerChore(RpcController controller, 1523 MasterProtos.RunCleanerChoreRequest request) 1524 throws ServiceException { 1525 return stub.runCleanerChore(controller, request); 1526 } 1527 1528 @Override 1529 public MasterProtos.SetCleanerChoreRunningResponse setCleanerChoreRunning( 1530 RpcController controller, MasterProtos.SetCleanerChoreRunningRequest request) 1531 throws ServiceException { 1532 return stub.setCleanerChoreRunning(controller, request); 1533 } 1534 1535 @Override 1536 public MasterProtos.IsCleanerChoreEnabledResponse isCleanerChoreEnabled( 1537 RpcController controller, MasterProtos.IsCleanerChoreEnabledRequest request) 1538 throws ServiceException { 1539 return stub.isCleanerChoreEnabled(controller, request); 1540 } 1541 1542 @Override 1543 public ClientProtos.CoprocessorServiceResponse execMasterService( 1544 RpcController controller, ClientProtos.CoprocessorServiceRequest request) 1545 throws ServiceException { 1546 return stub.execMasterService(controller, request); 1547 } 1548 1549 @Override 1550 public MasterProtos.SnapshotResponse snapshot(RpcController controller, 1551 MasterProtos.SnapshotRequest request) throws ServiceException { 1552 return stub.snapshot(controller, request); 1553 } 1554 1555 @Override 1556 public MasterProtos.GetCompletedSnapshotsResponse getCompletedSnapshots( 1557 RpcController controller, MasterProtos.GetCompletedSnapshotsRequest request) 1558 throws ServiceException { 1559 return stub.getCompletedSnapshots(controller, request); 1560 } 1561 1562 @Override 1563 public MasterProtos.DeleteSnapshotResponse deleteSnapshot(RpcController controller, 1564 MasterProtos.DeleteSnapshotRequest request) throws ServiceException { 1565 return stub.deleteSnapshot(controller, request); 1566 } 1567 1568 @Override 1569 public MasterProtos.IsSnapshotDoneResponse isSnapshotDone(RpcController controller, 1570 MasterProtos.IsSnapshotDoneRequest request) throws ServiceException { 1571 return stub.isSnapshotDone(controller, request); 1572 } 1573 1574 @Override 1575 public MasterProtos.RestoreSnapshotResponse restoreSnapshot( 1576 RpcController controller, MasterProtos.RestoreSnapshotRequest request) 1577 throws ServiceException { 1578 return stub.restoreSnapshot(controller, request); 1579 } 1580 1581 @Override 1582 public MasterProtos.SetSnapshotCleanupResponse switchSnapshotCleanup( 1583 RpcController controller, MasterProtos.SetSnapshotCleanupRequest request) 1584 throws ServiceException { 1585 return stub.switchSnapshotCleanup(controller, request); 1586 } 1587 1588 @Override 1589 public MasterProtos.IsSnapshotCleanupEnabledResponse isSnapshotCleanupEnabled( 1590 RpcController controller, MasterProtos.IsSnapshotCleanupEnabledRequest request) 1591 throws ServiceException { 1592 return stub.isSnapshotCleanupEnabled(controller, request); 1593 } 1594 1595 @Override 1596 public MasterProtos.ExecProcedureResponse execProcedure( 1597 RpcController controller, MasterProtos.ExecProcedureRequest request) 1598 throws ServiceException { 1599 return stub.execProcedure(controller, request); 1600 } 1601 1602 @Override 1603 public MasterProtos.ExecProcedureResponse execProcedureWithRet( 1604 RpcController controller, MasterProtos.ExecProcedureRequest request) 1605 throws ServiceException { 1606 return stub.execProcedureWithRet(controller, request); 1607 } 1608 1609 @Override 1610 public MasterProtos.IsProcedureDoneResponse isProcedureDone(RpcController controller, 1611 MasterProtos.IsProcedureDoneRequest request) throws ServiceException { 1612 return stub.isProcedureDone(controller, request); 1613 } 1614 1615 @Override 1616 public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller, 1617 MasterProtos.GetProcedureResultRequest request) throws ServiceException { 1618 return stub.getProcedureResult(controller, request); 1619 } 1620 1621 @Override 1622 public MasterProtos.IsMasterRunningResponse isMasterRunning( 1623 RpcController controller, MasterProtos.IsMasterRunningRequest request) 1624 throws ServiceException { 1625 return stub.isMasterRunning(controller, request); 1626 } 1627 1628 @Override 1629 public MasterProtos.ModifyNamespaceResponse modifyNamespace(RpcController controller, 1630 MasterProtos.ModifyNamespaceRequest request) 1631 throws ServiceException { 1632 return stub.modifyNamespace(controller, request); 1633 } 1634 1635 @Override 1636 public MasterProtos.CreateNamespaceResponse createNamespace( 1637 RpcController controller, 1638 MasterProtos.CreateNamespaceRequest request) throws ServiceException { 1639 return stub.createNamespace(controller, request); 1640 } 1641 1642 @Override 1643 public MasterProtos.DeleteNamespaceResponse deleteNamespace( 1644 RpcController controller, 1645 MasterProtos.DeleteNamespaceRequest request) throws ServiceException { 1646 return stub.deleteNamespace(controller, request); 1647 } 1648 1649 @Override 1650 public MasterProtos.ListNamespacesResponse listNamespaces( 1651 RpcController controller, 1652 MasterProtos.ListNamespacesRequest request) throws ServiceException { 1653 return stub.listNamespaces(controller, request); 1654 } 1655 1656 @Override 1657 public MasterProtos.GetNamespaceDescriptorResponse getNamespaceDescriptor( 1658 RpcController controller, 1659 MasterProtos.GetNamespaceDescriptorRequest request) throws ServiceException { 1660 return stub.getNamespaceDescriptor(controller, request); 1661 } 1662 1663 @Override 1664 public MasterProtos.ListNamespaceDescriptorsResponse listNamespaceDescriptors( 1665 RpcController controller, 1666 MasterProtos.ListNamespaceDescriptorsRequest request) throws ServiceException { 1667 return stub.listNamespaceDescriptors(controller, request); 1668 } 1669 1670 @Override 1671 public MasterProtos.ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace( 1672 RpcController controller, MasterProtos.ListTableDescriptorsByNamespaceRequest request) 1673 throws ServiceException { 1674 return stub.listTableDescriptorsByNamespace(controller, request); 1675 } 1676 1677 @Override 1678 public MasterProtos.ListTableNamesByNamespaceResponse listTableNamesByNamespace( 1679 RpcController controller, MasterProtos.ListTableNamesByNamespaceRequest request) 1680 throws ServiceException { 1681 return stub.listTableNamesByNamespace(controller, request); 1682 } 1683 1684 @Override 1685 public MasterProtos.GetTableStateResponse getTableState( 1686 RpcController controller, MasterProtos.GetTableStateRequest request) 1687 throws ServiceException { 1688 return stub.getTableState(controller, request); 1689 } 1690 1691 @Override 1692 public void close() { 1693 release(this.mss); 1694 } 1695 1696 @Override 1697 public MasterProtos.GetSchemaAlterStatusResponse getSchemaAlterStatus( 1698 RpcController controller, MasterProtos.GetSchemaAlterStatusRequest request) 1699 throws ServiceException { 1700 return stub.getSchemaAlterStatus(controller, request); 1701 } 1702 1703 @Override 1704 public MasterProtos.GetTableDescriptorsResponse getTableDescriptors( 1705 RpcController controller, MasterProtos.GetTableDescriptorsRequest request) 1706 throws ServiceException { 1707 return stub.getTableDescriptors(controller, request); 1708 } 1709 1710 @Override 1711 public MasterProtos.GetTableNamesResponse getTableNames( 1712 RpcController controller, MasterProtos.GetTableNamesRequest request) 1713 throws ServiceException { 1714 return stub.getTableNames(controller, request); 1715 } 1716 1717 @Override 1718 public MasterProtos.GetClusterStatusResponse getClusterStatus( 1719 RpcController controller, MasterProtos.GetClusterStatusRequest request) 1720 throws ServiceException { 1721 return stub.getClusterStatus(controller, request); 1722 } 1723 1724 @Override 1725 public MasterProtos.SetQuotaResponse setQuota( 1726 RpcController controller, MasterProtos.SetQuotaRequest request) 1727 throws ServiceException { 1728 return stub.setQuota(controller, request); 1729 } 1730 1731 @Override 1732 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp( 1733 RpcController controller, MasterProtos.MajorCompactionTimestampRequest request) 1734 throws ServiceException { 1735 return stub.getLastMajorCompactionTimestamp(controller, request); 1736 } 1737 1738 @Override 1739 public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestampForRegion( 1740 RpcController controller, MasterProtos.MajorCompactionTimestampForRegionRequest request) 1741 throws ServiceException { 1742 return stub.getLastMajorCompactionTimestampForRegion(controller, request); 1743 } 1744 1745 @Override 1746 public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, 1747 IsBalancerEnabledRequest request) throws ServiceException { 1748 return stub.isBalancerEnabled(controller, request); 1749 } 1750 1751 @Override 1752 public MasterProtos.SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled( 1753 RpcController controller, MasterProtos.SetSplitOrMergeEnabledRequest request) 1754 throws ServiceException { 1755 return stub.setSplitOrMergeEnabled(controller, request); 1756 } 1757 1758 @Override 1759 public MasterProtos.IsSplitOrMergeEnabledResponse isSplitOrMergeEnabled( 1760 RpcController controller, MasterProtos.IsSplitOrMergeEnabledRequest request) 1761 throws ServiceException { 1762 return stub.isSplitOrMergeEnabled(controller, request); 1763 } 1764 1765 @Override 1766 public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller, 1767 IsNormalizerEnabledRequest request) throws ServiceException { 1768 return stub.isNormalizerEnabled(controller, request); 1769 } 1770 1771 @Override 1772 public SecurityCapabilitiesResponse getSecurityCapabilities(RpcController controller, 1773 SecurityCapabilitiesRequest request) throws ServiceException { 1774 return stub.getSecurityCapabilities(controller, request); 1775 } 1776 1777 @Override 1778 public AddReplicationPeerResponse addReplicationPeer(RpcController controller, 1779 AddReplicationPeerRequest request) throws ServiceException { 1780 return stub.addReplicationPeer(controller, request); 1781 } 1782 1783 @Override 1784 public RemoveReplicationPeerResponse removeReplicationPeer(RpcController controller, 1785 RemoveReplicationPeerRequest request) throws ServiceException { 1786 return stub.removeReplicationPeer(controller, request); 1787 } 1788 1789 @Override 1790 public EnableReplicationPeerResponse enableReplicationPeer(RpcController controller, 1791 EnableReplicationPeerRequest request) throws ServiceException { 1792 return stub.enableReplicationPeer(controller, request); 1793 } 1794 1795 @Override 1796 public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller, 1797 DisableReplicationPeerRequest request) throws ServiceException { 1798 return stub.disableReplicationPeer(controller, request); 1799 } 1800 1801 @Override 1802 public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(RpcController controller, 1803 ListDecommissionedRegionServersRequest request) throws ServiceException { 1804 return stub.listDecommissionedRegionServers(controller, request); 1805 } 1806 1807 @Override 1808 public DecommissionRegionServersResponse decommissionRegionServers(RpcController controller, 1809 DecommissionRegionServersRequest request) throws ServiceException { 1810 return stub.decommissionRegionServers(controller, request); 1811 } 1812 1813 @Override 1814 public RecommissionRegionServerResponse recommissionRegionServer( 1815 RpcController controller, RecommissionRegionServerRequest request) 1816 throws ServiceException { 1817 return stub.recommissionRegionServer(controller, request); 1818 } 1819 1820 @Override 1821 public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller, 1822 GetReplicationPeerConfigRequest request) throws ServiceException { 1823 return stub.getReplicationPeerConfig(controller, request); 1824 } 1825 1826 @Override 1827 public UpdateReplicationPeerConfigResponse updateReplicationPeerConfig( 1828 RpcController controller, UpdateReplicationPeerConfigRequest request) 1829 throws ServiceException { 1830 return stub.updateReplicationPeerConfig(controller, request); 1831 } 1832 1833 @Override 1834 public ListReplicationPeersResponse listReplicationPeers(RpcController controller, 1835 ListReplicationPeersRequest request) throws ServiceException { 1836 return stub.listReplicationPeers(controller, request); 1837 } 1838 1839 @Override 1840 public GetSpaceQuotaRegionSizesResponse getSpaceQuotaRegionSizes( 1841 RpcController controller, GetSpaceQuotaRegionSizesRequest request) 1842 throws ServiceException { 1843 return stub.getSpaceQuotaRegionSizes(controller, request); 1844 } 1845 1846 @Override 1847 public GetQuotaStatesResponse getQuotaStates( 1848 RpcController controller, GetQuotaStatesRequest request) throws ServiceException { 1849 return stub.getQuotaStates(controller, request); 1850 } 1851 1852 @Override 1853 public MasterProtos.ClearDeadServersResponse clearDeadServers(RpcController controller, 1854 MasterProtos.ClearDeadServersRequest request) throws ServiceException { 1855 return stub.clearDeadServers(controller, request); 1856 } 1857 1858 @Override 1859 public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, 1860 SwitchRpcThrottleRequest request) throws ServiceException { 1861 return stub.switchRpcThrottle(controller, request); 1862 } 1863 1864 @Override 1865 public IsRpcThrottleEnabledResponse isRpcThrottleEnabled(RpcController controller, 1866 IsRpcThrottleEnabledRequest request) throws ServiceException { 1867 return stub.isRpcThrottleEnabled(controller, request); 1868 } 1869 1870 @Override 1871 public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller, 1872 SwitchExceedThrottleQuotaRequest request) throws ServiceException { 1873 return stub.switchExceedThrottleQuota(controller, request); 1874 } 1875 1876 @Override 1877 public AccessControlProtos.GrantResponse grant(RpcController controller, 1878 AccessControlProtos.GrantRequest request) throws ServiceException { 1879 return stub.grant(controller, request); 1880 } 1881 1882 @Override 1883 public AccessControlProtos.RevokeResponse revoke(RpcController controller, 1884 AccessControlProtos.RevokeRequest request) throws ServiceException { 1885 return stub.revoke(controller, request); 1886 } 1887 1888 @Override 1889 public GetUserPermissionsResponse getUserPermissions(RpcController controller, 1890 GetUserPermissionsRequest request) throws ServiceException { 1891 return stub.getUserPermissions(controller, request); 1892 } 1893 1894 @Override 1895 public HasUserPermissionsResponse hasUserPermissions(RpcController controller, 1896 HasUserPermissionsRequest request) throws ServiceException { 1897 return stub.hasUserPermissions(controller, request); 1898 } 1899 1900 @Override 1901 public HBaseProtos.LogEntry getLogEntries(RpcController controller, 1902 HBaseProtos.LogRequest request) throws ServiceException { 1903 return stub.getLogEntries(controller, request); 1904 } 1905 }; 1906 } 1907 1908 private static void release(MasterServiceState mss) { 1909 if (mss != null && mss.connection != null) { 1910 ((ConnectionImplementation)mss.connection).releaseMaster(mss); 1911 } 1912 } 1913 1914 private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) { 1915 if (mss.getStub() == null){ 1916 return false; 1917 } 1918 try { 1919 return mss.isMasterRunning(); 1920 } catch (UndeclaredThrowableException e) { 1921 // It's somehow messy, but we can receive exceptions such as 1922 // java.net.ConnectException but they're not declared. So we catch it... 1923 LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable()); 1924 return false; 1925 } catch (IOException se) { 1926 LOG.warn("Checking master connection", se); 1927 return false; 1928 } 1929 } 1930 1931 void releaseMaster(MasterServiceState mss) { 1932 if (mss.getStub() == null) { 1933 return; 1934 } 1935 synchronized (masterLock) { 1936 --mss.userCount; 1937 } 1938 } 1939 1940 private void closeMasterService(MasterServiceState mss) { 1941 if (mss.getStub() != null) { 1942 LOG.info("Closing master protocol: " + mss); 1943 mss.clearStub(); 1944 } 1945 mss.userCount = 0; 1946 } 1947 1948 /** 1949 * Immediate close of the shared master. Can be by the delayed close or when closing the 1950 * connection itself. 1951 */ 1952 private void closeMaster() { 1953 synchronized (masterLock) { 1954 closeMasterService(masterServiceState); 1955 } 1956 } 1957 1958 void updateCachedLocation(RegionInfo hri, ServerName source, ServerName serverName, long seqNum) { 1959 HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum); 1960 cacheLocation(hri.getTable(), source, newHrl); 1961 } 1962 1963 @Override 1964 public void deleteCachedRegionLocation(final HRegionLocation location) { 1965 metaCache.clearCache(location); 1966 } 1967 1968 /** 1969 * Update the location with the new value (if the exception is a RegionMovedException) 1970 * or delete it from the cache. Does nothing if we can be sure from the exception that 1971 * the location is still accurate, or if the cache has already been updated. 1972 * @param exception an object (to simplify user code) on which we will try to find a nested 1973 * or wrapped or both RegionMovedException 1974 * @param source server that is the source of the location update. 1975 */ 1976 @Override 1977 public void updateCachedLocations(final TableName tableName, byte[] regionName, byte[] rowkey, 1978 final Object exception, final ServerName source) { 1979 if (rowkey == null || tableName == null) { 1980 LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) + 1981 ", tableName=" + (tableName == null ? "null" : tableName)); 1982 return; 1983 } 1984 1985 if (source == null) { 1986 // This should not happen, but let's secure ourselves. 1987 return; 1988 } 1989 1990 if (regionName == null) { 1991 // we do not know which region, so just remove the cache entry for the row and server 1992 if (metrics != null) { 1993 metrics.incrCacheDroppingExceptions(exception); 1994 } 1995 metaCache.clearCache(tableName, rowkey, source); 1996 return; 1997 } 1998 1999 // Is it something we have already updated? 2000 final RegionLocations oldLocations = getCachedLocation(tableName, rowkey); 2001 HRegionLocation oldLocation = null; 2002 if (oldLocations != null) { 2003 oldLocation = oldLocations.getRegionLocationByRegionName(regionName); 2004 } 2005 if (oldLocation == null || !source.equals(oldLocation.getServerName())) { 2006 // There is no such location in the cache (it's been removed already) or 2007 // the cache has already been refreshed with a different location. => nothing to do 2008 return; 2009 } 2010 2011 RegionInfo regionInfo = oldLocation.getRegion(); 2012 Throwable cause = ClientExceptionsUtil.findException(exception); 2013 if (cause != null) { 2014 if (!ClientExceptionsUtil.isMetaClearingException(cause)) { 2015 // We know that the region is still on this region server 2016 return; 2017 } 2018 2019 if (cause instanceof RegionMovedException) { 2020 RegionMovedException rme = (RegionMovedException) cause; 2021 if (LOG.isTraceEnabled()) { 2022 LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " + 2023 rme.getHostname() + ":" + rme.getPort() + 2024 " according to " + source.getAddress()); 2025 } 2026 // We know that the region is not anymore on this region server, but we know 2027 // the new location. 2028 updateCachedLocation( 2029 regionInfo, source, rme.getServerName(), rme.getLocationSeqNum()); 2030 return; 2031 } 2032 } 2033 2034 if (metrics != null) { 2035 metrics.incrCacheDroppingExceptions(exception); 2036 } 2037 2038 // Tell metaReplicaSelector that the location is stale. It will create a stale entry 2039 // with timestamp internally. Next time the client looks up the same location, 2040 // it will pick a different meta replica region. 2041 if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) { 2042 metaReplicaSelector.onError(oldLocation); 2043 } 2044 2045 // If we're here, it means that can cannot be sure about the location, so we remove it from 2046 // the cache. Do not send the source because source can be a new server in the same host:port 2047 metaCache.clearCache(regionInfo); 2048 } 2049 2050 @Override 2051 public AsyncProcess getAsyncProcess() { 2052 return asyncProcess; 2053 } 2054 2055 @Override 2056 public ServerStatisticTracker getStatisticsTracker() { 2057 return this.stats; 2058 } 2059 2060 @Override 2061 public ClientBackoffPolicy getBackoffPolicy() { 2062 return this.backoffPolicy; 2063 } 2064 2065 /* 2066 * Return the number of cached region for a table. It will only be called 2067 * from a unit test. 2068 */ 2069 int getNumberOfCachedRegionLocations(final TableName tableName) { 2070 return metaCache.getNumberOfCachedRegionLocations(tableName); 2071 } 2072 2073 @Override 2074 public void abort(final String msg, Throwable t) { 2075 if (t != null) { 2076 LOG.error(HBaseMarkers.FATAL, msg, t); 2077 } else { 2078 LOG.error(HBaseMarkers.FATAL, msg); 2079 } 2080 this.aborted = true; 2081 close(); 2082 this.closed = true; 2083 } 2084 2085 @Override 2086 public boolean isClosed() { 2087 return this.closed; 2088 } 2089 2090 @Override 2091 public boolean isAborted(){ 2092 return this.aborted; 2093 } 2094 2095 @Override 2096 public void close() { 2097 if (this.closed) { 2098 return; 2099 } 2100 closeMaster(); 2101 shutdownPools(); 2102 if (this.metrics != null) { 2103 this.metrics.shutdown(); 2104 } 2105 this.closed = true; 2106 registry.close(); 2107 this.stubs.clear(); 2108 if (clusterStatusListener != null) { 2109 clusterStatusListener.close(); 2110 } 2111 if (rpcClient != null) { 2112 rpcClient.close(); 2113 } 2114 if (choreService != null) { 2115 choreService.shutdown(); 2116 } 2117 } 2118 2119 /** 2120 * Close the connection for good. On the off chance that someone is unable to close 2121 * the connection, perhaps because it bailed out prematurely, the method 2122 * below will ensure that this instance is cleaned up. 2123 * Caveat: The JVM may take an unknown amount of time to call finalize on an 2124 * unreachable object, so our hope is that every consumer cleans up after 2125 * itself, like any good citizen. 2126 */ 2127 @Override 2128 protected void finalize() throws Throwable { 2129 super.finalize(); 2130 close(); 2131 } 2132 2133 @Override 2134 public NonceGenerator getNonceGenerator() { 2135 return nonceGenerator; 2136 } 2137 2138 @Override 2139 public TableState getTableState(TableName tableName) throws IOException { 2140 checkClosed(); 2141 TableState tableState = MetaTableAccessor.getTableState(this, tableName); 2142 if (tableState == null) { 2143 throw new TableNotFoundException(tableName); 2144 } 2145 return tableState; 2146 } 2147 2148 @Override 2149 public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) { 2150 return RpcRetryingCallerFactory 2151 .instantiate(conf, this.interceptor, this.getStatisticsTracker()); 2152 } 2153 2154 @Override 2155 public boolean hasCellBlockSupport() { 2156 return this.rpcClient.hasCellBlockSupport(); 2157 } 2158 2159 @Override 2160 public ConnectionConfiguration getConnectionConfiguration() { 2161 return this.connectionConfig; 2162 } 2163 2164 @Override 2165 public RpcRetryingCallerFactory getRpcRetryingCallerFactory() { 2166 return this.rpcCallerFactory; 2167 } 2168 2169 @Override 2170 public RpcControllerFactory getRpcControllerFactory() { 2171 return this.rpcControllerFactory; 2172 } 2173 2174 private static <T> T get(CompletableFuture<T> future) throws IOException { 2175 try { 2176 return future.get(); 2177 } catch (InterruptedException e) { 2178 Thread.currentThread().interrupt(); 2179 throw (IOException) new InterruptedIOException().initCause(e); 2180 } catch (ExecutionException e) { 2181 Throwable cause = e.getCause(); 2182 Throwables.propagateIfPossible(cause, IOException.class); 2183 throw new IOException(cause); 2184 } 2185 } 2186}