001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED; 021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT; 022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS; 023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; 028import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY; 029import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 030 031import io.opentelemetry.api.trace.Span; 032import java.io.IOException; 033import java.net.SocketAddress; 034import java.util.Collections; 035import java.util.Map; 036import java.util.Optional; 037import java.util.concurrent.CompletableFuture; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.ConcurrentMap; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.ExecutorService; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.atomic.AtomicBoolean; 044import java.util.concurrent.atomic.AtomicReference; 045import org.apache.commons.io.IOUtils; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.AuthUtil; 048import org.apache.hadoop.hbase.ChoreService; 049import org.apache.hadoop.hbase.MasterNotRunningException; 050import org.apache.hadoop.hbase.ServerName; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 053import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 054import org.apache.hadoop.hbase.ipc.RpcClient; 055import org.apache.hadoop.hbase.ipc.RpcClientFactory; 056import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 057import org.apache.hadoop.hbase.security.User; 058import org.apache.hadoop.hbase.trace.TraceUtil; 059import org.apache.hadoop.hbase.util.ConcurrentMapUtils; 060import org.apache.hadoop.hbase.util.Threads; 061import org.apache.hadoop.security.UserGroupInformation; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 067import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 068 069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 073 074/** 075 * The implementation of AsyncConnection. 076 */ 077@InterfaceAudience.Private 078public class AsyncConnectionImpl implements AsyncConnection { 079 080 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); 081 082 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( 083 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) 084 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 085 10, TimeUnit.MILLISECONDS); 086 087 private final Configuration conf; 088 089 final AsyncConnectionConfiguration connConf; 090 091 protected final User user; 092 093 final ConnectionRegistry registry; 094 095 protected final int rpcTimeout; 096 097 protected final RpcClient rpcClient; 098 099 final RpcControllerFactory rpcControllerFactory; 100 101 private final AsyncRegionLocator locator; 102 103 final AsyncRpcRetryingCallerFactory callerFactory; 104 105 private final NonceGenerator nonceGenerator; 106 107 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); 108 private final ConcurrentMap<String, AdminService.Interface> adminStubs = 109 new ConcurrentHashMap<>(); 110 111 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); 112 113 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = 114 new AtomicReference<>(); 115 116 private final Optional<ServerStatisticTracker> stats; 117 private final ClientBackoffPolicy backoffPolicy; 118 119 private ChoreService choreService; 120 121 private final AtomicBoolean closed = new AtomicBoolean(false); 122 123 private final String metricsScope; 124 private final Optional<MetricsConnection> metrics; 125 126 private final ClusterStatusListener clusterStatusListener; 127 128 private volatile ConnectionOverAsyncConnection conn; 129 130 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, 131 SocketAddress localAddress, User user) { 132 this(conf, registry, clusterId, localAddress, user, Collections.emptyMap()); 133 } 134 135 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, 136 SocketAddress localAddress, User user, Map<String, byte[]> connectionAttributes) { 137 this.conf = conf; 138 this.user = user; 139 this.metricsScope = MetricsConnection.getScope(conf, clusterId, this); 140 141 if (user.isLoginFromKeytab()) { 142 spawnRenewalChore(user.getUGI()); 143 } 144 this.connConf = new AsyncConnectionConfiguration(conf); 145 this.registry = registry; 146 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 147 this.metrics = Optional 148 .of(MetricsConnection.getMetricsConnection(conf, metricsScope, () -> null, () -> null)); 149 } else { 150 this.metrics = Optional.empty(); 151 } 152 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, 153 metrics.orElse(null), connectionAttributes); 154 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 155 this.rpcTimeout = 156 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); 157 this.locator = new AsyncRegionLocator(this, RETRY_TIMER); 158 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); 159 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { 160 nonceGenerator = PerClientRandomNonceGenerator.get(); 161 } else { 162 nonceGenerator = NO_NONCE_GENERATOR; 163 } 164 this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf)); 165 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 166 ClusterStatusListener listener = null; 167 if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) { 168 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass 169 // it in, just like clusterId. Not a big problem for now as the default value is false. 170 Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass( 171 STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); 172 if (listenerClass == null) { 173 LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); 174 } else { 175 try { 176 listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() { 177 @Override 178 public void newDead(ServerName sn) { 179 locator.clearCache(sn); 180 rpcClient.cancelConnections(sn); 181 } 182 }, conf, listenerClass); 183 } catch (IOException e) { 184 LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); 185 } 186 } 187 } 188 this.clusterStatusListener = listener; 189 } 190 191 private void spawnRenewalChore(final UserGroupInformation user) { 192 ChoreService service = getChoreService(); 193 service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf)); 194 } 195 196 /** 197 * If choreService has not been created yet, create the ChoreService. 198 */ 199 synchronized ChoreService getChoreService() { 200 if (isClosed()) { 201 throw new IllegalStateException("connection is already closed"); 202 } 203 if (choreService == null) { 204 choreService = new ChoreService("AsyncConn Chore Service"); 205 } 206 return choreService; 207 } 208 209 public User getUser() { 210 return user; 211 } 212 213 public ConnectionRegistry getConnectionRegistry() { 214 return registry; 215 } 216 217 @Override 218 public Configuration getConfiguration() { 219 return conf; 220 } 221 222 @Override 223 public boolean isClosed() { 224 return closed.get(); 225 } 226 227 @Override 228 public void close() { 229 TraceUtil.trace(() -> { 230 if (!closed.compareAndSet(false, true)) { 231 return; 232 } 233 LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); 234 if (LOG.isDebugEnabled()) { 235 logCallStack(Thread.currentThread().getStackTrace()); 236 } 237 IOUtils.closeQuietly(clusterStatusListener, 238 e -> LOG.warn("failed to close clusterStatusListener", e)); 239 IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e)); 240 IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e)); 241 synchronized (this) { 242 if (choreService != null) { 243 choreService.shutdown(); 244 choreService = null; 245 } 246 } 247 if (metrics.isPresent()) { 248 MetricsConnection.deleteMetricsConnection(metricsScope); 249 } 250 ConnectionOverAsyncConnection c = this.conn; 251 if (c != null) { 252 c.closePool(); 253 } 254 }, "AsyncConnection.close"); 255 } 256 257 private void logCallStack(StackTraceElement[] stackTraceElements) { 258 StringBuilder stackBuilder = new StringBuilder("Call stack:"); 259 for (StackTraceElement element : stackTraceElements) { 260 stackBuilder.append("\n at "); 261 stackBuilder.append(element); 262 } 263 stackBuilder.append("\n"); 264 LOG.debug(stackBuilder.toString()); 265 } 266 267 @Override 268 public AsyncTableRegionLocator getRegionLocator(TableName tableName) { 269 return new AsyncTableRegionLocatorImpl(tableName, this); 270 } 271 272 @Override 273 public void clearRegionLocationCache() { 274 locator.clearCache(); 275 } 276 277 // we will override this method for testing retry caller, so do not remove this method. 278 AsyncRegionLocator getLocator() { 279 return locator; 280 } 281 282 // ditto 283 NonceGenerator getNonceGenerator() { 284 return nonceGenerator; 285 } 286 287 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { 288 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 289 } 290 291 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 292 return ConcurrentMapUtils.computeIfAbsentEx(rsStubs, 293 getStubKey(ClientService.getDescriptor().getName(), serverName), 294 () -> createRegionServerStub(serverName)); 295 } 296 297 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException { 298 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 299 } 300 301 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException { 302 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 303 } 304 305 AdminService.Interface getAdminStub(ServerName serverName) throws IOException { 306 return ConcurrentMapUtils.computeIfAbsentEx(adminStubs, 307 getStubKey(AdminService.getDescriptor().getName(), serverName), 308 () -> createAdminServerStub(serverName)); 309 } 310 311 CompletableFuture<MasterService.Interface> getMasterStub() { 312 return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { 313 CompletableFuture<MasterService.Interface> future = new CompletableFuture<>(); 314 addListener(registry.getActiveMaster(), (addr, error) -> { 315 if (error != null) { 316 future.completeExceptionally(error); 317 } else if (addr == null) { 318 future.completeExceptionally(new MasterNotRunningException( 319 "ZooKeeper available but no active master location found")); 320 } else { 321 LOG.debug("The fetched master address is {}", addr); 322 try { 323 future.complete(createMasterStub(addr)); 324 } catch (IOException e) { 325 future.completeExceptionally(e); 326 } 327 } 328 329 }); 330 return future; 331 }, stub -> true, "master stub"); 332 } 333 334 String getClusterId() { 335 try { 336 return registry.getClusterId().get(); 337 } catch (InterruptedException | ExecutionException e) { 338 LOG.error("Error fetching cluster ID: ", e); 339 } 340 return null; 341 } 342 343 void clearMasterStubCache(MasterService.Interface stub) { 344 masterStub.compareAndSet(stub, null); 345 } 346 347 Optional<ServerStatisticTracker> getStatisticsTracker() { 348 return stats; 349 } 350 351 ClientBackoffPolicy getBackoffPolicy() { 352 return backoffPolicy; 353 } 354 355 @Override 356 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { 357 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { 358 359 @Override 360 public AsyncTable<AdvancedScanResultConsumer> build() { 361 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 362 } 363 }; 364 } 365 366 @Override 367 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, 368 ExecutorService pool) { 369 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { 370 371 @Override 372 public AsyncTable<ScanResultConsumer> build() { 373 RawAsyncTableImpl rawTable = 374 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 375 return new AsyncTableImpl(rawTable, pool); 376 } 377 }; 378 } 379 380 @Override 381 public AsyncAdminBuilder getAdminBuilder() { 382 return new AsyncAdminBuilderBase(connConf) { 383 @Override 384 public AsyncAdmin build() { 385 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 386 } 387 }; 388 } 389 390 @Override 391 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { 392 return new AsyncAdminBuilderBase(connConf) { 393 @Override 394 public AsyncAdmin build() { 395 RawAsyncHBaseAdmin rawAdmin = 396 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 397 return new AsyncHBaseAdmin(rawAdmin, pool); 398 } 399 }; 400 } 401 402 @Override 403 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { 404 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER); 405 } 406 407 @Override 408 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, 409 ExecutorService pool) { 410 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), 411 RETRY_TIMER); 412 } 413 414 @Override 415 public Connection toConnection() { 416 ConnectionOverAsyncConnection c = this.conn; 417 if (c != null) { 418 return c; 419 } 420 synchronized (this) { 421 c = this.conn; 422 if (c != null) { 423 return c; 424 } 425 c = new ConnectionOverAsyncConnection(this); 426 this.conn = c; 427 } 428 return c; 429 } 430 431 private Hbck getHbckInternal(ServerName masterServer) { 432 Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName()); 433 // we will not create a new connection when creating a new protobuf stub, and for hbck there 434 // will be no performance consideration, so for simplification we will create a new stub every 435 // time instead of caching the stub here. 436 return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( 437 rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); 438 } 439 440 @Override 441 public CompletableFuture<Hbck> getHbck() { 442 return TraceUtil.tracedFuture(() -> { 443 CompletableFuture<Hbck> future = new CompletableFuture<>(); 444 addListener(registry.getActiveMaster(), (sn, error) -> { 445 if (error != null) { 446 future.completeExceptionally(error); 447 } else { 448 future.complete(getHbckInternal(sn)); 449 } 450 }); 451 return future; 452 }, "AsyncConnection.getHbck"); 453 } 454 455 @Override 456 public Hbck getHbck(ServerName masterServer) { 457 return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck"); 458 } 459 460 Optional<MetricsConnection> getConnectionMetrics() { 461 return metrics; 462 } 463}