001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED; 021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT; 022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS; 023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; 028import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY; 029import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 030 031import io.opentelemetry.api.trace.Span; 032import java.io.IOException; 033import java.net.SocketAddress; 034import java.util.Optional; 035import java.util.concurrent.CompletableFuture; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.ConcurrentMap; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.ExecutorService; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicReference; 043import org.apache.commons.io.IOUtils; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.hbase.AuthUtil; 046import org.apache.hadoop.hbase.ChoreService; 047import org.apache.hadoop.hbase.MasterNotRunningException; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 051import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 052import org.apache.hadoop.hbase.ipc.RpcClient; 053import org.apache.hadoop.hbase.ipc.RpcClientFactory; 054import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 055import org.apache.hadoop.hbase.security.User; 056import org.apache.hadoop.hbase.trace.TraceUtil; 057import org.apache.hadoop.hbase.util.ConcurrentMapUtils; 058import org.apache.hadoop.hbase.util.Threads; 059import org.apache.hadoop.security.UserGroupInformation; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 065import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 066 067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 071 072/** 073 * The implementation of AsyncConnection. 074 */ 075@InterfaceAudience.Private 076public class AsyncConnectionImpl implements AsyncConnection { 077 078 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); 079 080 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( 081 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) 082 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 083 10, TimeUnit.MILLISECONDS); 084 085 private final Configuration conf; 086 087 final AsyncConnectionConfiguration connConf; 088 089 protected final User user; 090 091 final ConnectionRegistry registry; 092 093 protected final int rpcTimeout; 094 095 protected final RpcClient rpcClient; 096 097 final RpcControllerFactory rpcControllerFactory; 098 099 private final AsyncRegionLocator locator; 100 101 final AsyncRpcRetryingCallerFactory callerFactory; 102 103 private final NonceGenerator nonceGenerator; 104 105 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); 106 private final ConcurrentMap<String, AdminService.Interface> adminStubs = 107 new ConcurrentHashMap<>(); 108 109 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); 110 111 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = 112 new AtomicReference<>(); 113 114 private final Optional<ServerStatisticTracker> stats; 115 private final ClientBackoffPolicy backoffPolicy; 116 117 private ChoreService choreService; 118 119 private final AtomicBoolean closed = new AtomicBoolean(false); 120 121 private final String metricsScope; 122 private final Optional<MetricsConnection> metrics; 123 124 private final ClusterStatusListener clusterStatusListener; 125 126 private volatile ConnectionOverAsyncConnection conn; 127 128 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, 129 SocketAddress localAddress, User user) { 130 this.conf = conf; 131 this.user = user; 132 this.metricsScope = MetricsConnection.getScope(conf, clusterId, this); 133 134 if (user.isLoginFromKeytab()) { 135 spawnRenewalChore(user.getUGI()); 136 } 137 this.connConf = new AsyncConnectionConfiguration(conf); 138 this.registry = registry; 139 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 140 this.metrics = 141 Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null)); 142 } else { 143 this.metrics = Optional.empty(); 144 } 145 this.rpcClient = 146 RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null)); 147 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 148 this.rpcTimeout = 149 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); 150 this.locator = new AsyncRegionLocator(this, RETRY_TIMER); 151 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); 152 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { 153 nonceGenerator = PerClientRandomNonceGenerator.get(); 154 } else { 155 nonceGenerator = NO_NONCE_GENERATOR; 156 } 157 this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf)); 158 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 159 ClusterStatusListener listener = null; 160 if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) { 161 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass 162 // it in, just like clusterId. Not a big problem for now as the default value is false. 163 Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass( 164 STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); 165 if (listenerClass == null) { 166 LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); 167 } else { 168 try { 169 listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() { 170 @Override 171 public void newDead(ServerName sn) { 172 locator.clearCache(sn); 173 rpcClient.cancelConnections(sn); 174 } 175 }, conf, listenerClass); 176 } catch (IOException e) { 177 LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); 178 } 179 } 180 } 181 this.clusterStatusListener = listener; 182 } 183 184 private void spawnRenewalChore(final UserGroupInformation user) { 185 ChoreService service = getChoreService(); 186 service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf)); 187 } 188 189 /** 190 * If choreService has not been created yet, create the ChoreService. 191 */ 192 synchronized ChoreService getChoreService() { 193 if (isClosed()) { 194 throw new IllegalStateException("connection is already closed"); 195 } 196 if (choreService == null) { 197 choreService = new ChoreService("AsyncConn Chore Service"); 198 } 199 return choreService; 200 } 201 202 public User getUser() { 203 return user; 204 } 205 206 public ConnectionRegistry getConnectionRegistry() { 207 return registry; 208 } 209 210 @Override 211 public Configuration getConfiguration() { 212 return conf; 213 } 214 215 @Override 216 public boolean isClosed() { 217 return closed.get(); 218 } 219 220 @Override 221 public void close() { 222 TraceUtil.trace(() -> { 223 if (!closed.compareAndSet(false, true)) { 224 return; 225 } 226 LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); 227 if (LOG.isDebugEnabled()) { 228 logCallStack(Thread.currentThread().getStackTrace()); 229 } 230 IOUtils.closeQuietly(clusterStatusListener, 231 e -> LOG.warn("failed to close clusterStatusListener", e)); 232 IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e)); 233 IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e)); 234 synchronized (this) { 235 if (choreService != null) { 236 choreService.shutdown(); 237 choreService = null; 238 } 239 } 240 if (metrics.isPresent()) { 241 MetricsConnection.deleteMetricsConnection(metricsScope); 242 } 243 ConnectionOverAsyncConnection c = this.conn; 244 if (c != null) { 245 c.closePool(); 246 } 247 }, "AsyncConnection.close"); 248 } 249 250 private void logCallStack(StackTraceElement[] stackTraceElements) { 251 StringBuilder stackBuilder = new StringBuilder("Call stack:"); 252 for (StackTraceElement element : stackTraceElements) { 253 stackBuilder.append("\n at "); 254 stackBuilder.append(element); 255 } 256 stackBuilder.append("\n"); 257 LOG.debug(stackBuilder.toString()); 258 } 259 260 @Override 261 public AsyncTableRegionLocator getRegionLocator(TableName tableName) { 262 return new AsyncTableRegionLocatorImpl(tableName, this); 263 } 264 265 @Override 266 public void clearRegionLocationCache() { 267 locator.clearCache(); 268 } 269 270 // we will override this method for testing retry caller, so do not remove this method. 271 AsyncRegionLocator getLocator() { 272 return locator; 273 } 274 275 // ditto 276 NonceGenerator getNonceGenerator() { 277 return nonceGenerator; 278 } 279 280 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { 281 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 282 } 283 284 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 285 return ConcurrentMapUtils.computeIfAbsentEx(rsStubs, 286 getStubKey(ClientService.getDescriptor().getName(), serverName), 287 () -> createRegionServerStub(serverName)); 288 } 289 290 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException { 291 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 292 } 293 294 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException { 295 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 296 } 297 298 AdminService.Interface getAdminStub(ServerName serverName) throws IOException { 299 return ConcurrentMapUtils.computeIfAbsentEx(adminStubs, 300 getStubKey(AdminService.getDescriptor().getName(), serverName), 301 () -> createAdminServerStub(serverName)); 302 } 303 304 CompletableFuture<MasterService.Interface> getMasterStub() { 305 return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { 306 CompletableFuture<MasterService.Interface> future = new CompletableFuture<>(); 307 addListener(registry.getActiveMaster(), (addr, error) -> { 308 if (error != null) { 309 future.completeExceptionally(error); 310 } else if (addr == null) { 311 future.completeExceptionally(new MasterNotRunningException( 312 "ZooKeeper available but no active master location found")); 313 } else { 314 LOG.debug("The fetched master address is {}", addr); 315 try { 316 future.complete(createMasterStub(addr)); 317 } catch (IOException e) { 318 future.completeExceptionally(e); 319 } 320 } 321 322 }); 323 return future; 324 }, stub -> true, "master stub"); 325 } 326 327 String getClusterId() { 328 try { 329 return registry.getClusterId().get(); 330 } catch (InterruptedException | ExecutionException e) { 331 LOG.error("Error fetching cluster ID: ", e); 332 } 333 return null; 334 } 335 336 void clearMasterStubCache(MasterService.Interface stub) { 337 masterStub.compareAndSet(stub, null); 338 } 339 340 Optional<ServerStatisticTracker> getStatisticsTracker() { 341 return stats; 342 } 343 344 ClientBackoffPolicy getBackoffPolicy() { 345 return backoffPolicy; 346 } 347 348 @Override 349 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { 350 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { 351 352 @Override 353 public AsyncTable<AdvancedScanResultConsumer> build() { 354 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 355 } 356 }; 357 } 358 359 @Override 360 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, 361 ExecutorService pool) { 362 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { 363 364 @Override 365 public AsyncTable<ScanResultConsumer> build() { 366 RawAsyncTableImpl rawTable = 367 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 368 return new AsyncTableImpl(rawTable, pool); 369 } 370 }; 371 } 372 373 @Override 374 public AsyncAdminBuilder getAdminBuilder() { 375 return new AsyncAdminBuilderBase(connConf) { 376 @Override 377 public AsyncAdmin build() { 378 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 379 } 380 }; 381 } 382 383 @Override 384 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { 385 return new AsyncAdminBuilderBase(connConf) { 386 @Override 387 public AsyncAdmin build() { 388 RawAsyncHBaseAdmin rawAdmin = 389 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 390 return new AsyncHBaseAdmin(rawAdmin, pool); 391 } 392 }; 393 } 394 395 @Override 396 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { 397 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER); 398 } 399 400 @Override 401 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, 402 ExecutorService pool) { 403 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), 404 RETRY_TIMER); 405 } 406 407 @Override 408 public Connection toConnection() { 409 ConnectionOverAsyncConnection c = this.conn; 410 if (c != null) { 411 return c; 412 } 413 synchronized (this) { 414 c = this.conn; 415 if (c != null) { 416 return c; 417 } 418 c = new ConnectionOverAsyncConnection(this); 419 this.conn = c; 420 } 421 return c; 422 } 423 424 private Hbck getHbckInternal(ServerName masterServer) { 425 Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName()); 426 // we will not create a new connection when creating a new protobuf stub, and for hbck there 427 // will be no performance consideration, so for simplification we will create a new stub every 428 // time instead of caching the stub here. 429 return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( 430 rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); 431 } 432 433 @Override 434 public CompletableFuture<Hbck> getHbck() { 435 return TraceUtil.tracedFuture(() -> { 436 CompletableFuture<Hbck> future = new CompletableFuture<>(); 437 addListener(registry.getActiveMaster(), (sn, error) -> { 438 if (error != null) { 439 future.completeExceptionally(error); 440 } else { 441 future.complete(getHbckInternal(sn)); 442 } 443 }); 444 return future; 445 }, "AsyncConnection.getHbck"); 446 } 447 448 @Override 449 public Hbck getHbck(ServerName masterServer) { 450 return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck"); 451 } 452 453 Optional<MetricsConnection> getConnectionMetrics() { 454 return metrics; 455 } 456}