001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED; 021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT; 022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS; 023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; 028import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY; 029import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 030 031import io.opentelemetry.api.trace.Span; 032import java.io.IOException; 033import java.util.Optional; 034import java.util.concurrent.CompletableFuture; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.ConcurrentMap; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.ExecutorService; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.atomic.AtomicReference; 041import org.apache.commons.io.IOUtils; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.hbase.AuthUtil; 044import org.apache.hadoop.hbase.ChoreService; 045import org.apache.hadoop.hbase.MasterNotRunningException; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 049import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 050import org.apache.hadoop.hbase.ipc.RpcClient; 051import org.apache.hadoop.hbase.ipc.RpcClientFactory; 052import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 053import org.apache.hadoop.hbase.security.User; 054import org.apache.hadoop.hbase.trace.TraceUtil; 055import org.apache.hadoop.hbase.util.ConcurrentMapUtils; 056import org.apache.hadoop.hbase.util.Threads; 057import org.apache.hadoop.security.UserGroupInformation; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 063import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 064 065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 069 070/** 071 * The implementation of AsyncConnection. 072 */ 073@InterfaceAudience.Private 074public class AsyncConnectionImpl implements AsyncConnection { 075 076 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); 077 078 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( 079 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) 080 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 081 10, TimeUnit.MILLISECONDS); 082 083 private final Configuration conf; 084 085 final AsyncConnectionConfiguration connConf; 086 087 private final User user; 088 089 final ConnectionRegistry registry; 090 091 private final int rpcTimeout; 092 093 private final RpcClient rpcClient; 094 095 final RpcControllerFactory rpcControllerFactory; 096 097 private final AsyncRegionLocator locator; 098 099 final AsyncRpcRetryingCallerFactory callerFactory; 100 101 private final NonceGenerator nonceGenerator; 102 103 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); 104 private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>(); 105 106 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); 107 108 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = 109 new AtomicReference<>(); 110 111 private final Optional<ServerStatisticTracker> stats; 112 private final ClientBackoffPolicy backoffPolicy; 113 114 private ChoreService choreService; 115 116 private volatile boolean closed = false; 117 118 private final Optional<MetricsConnection> metrics; 119 120 private final ClusterStatusListener clusterStatusListener; 121 122 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, 123 User user) { 124 this.conf = conf; 125 this.user = user; 126 127 if (user.isLoginFromKeytab()) { 128 spawnRenewalChore(user.getUGI()); 129 } 130 this.connConf = new AsyncConnectionConfiguration(conf); 131 this.registry = registry; 132 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 133 String scope = MetricsConnection.getScope(conf, clusterId, this); 134 this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null)); 135 } else { 136 this.metrics = Optional.empty(); 137 } 138 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null)); 139 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 140 this.rpcTimeout = 141 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); 142 this.locator = new AsyncRegionLocator(this, RETRY_TIMER); 143 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); 144 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { 145 nonceGenerator = PerClientRandomNonceGenerator.get(); 146 } else { 147 nonceGenerator = NO_NONCE_GENERATOR; 148 } 149 this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf)); 150 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 151 ClusterStatusListener listener = null; 152 if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) { 153 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass 154 // it in, just like clusterId. Not a big problem for now as the default value is false. 155 Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass( 156 STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); 157 if (listenerClass == null) { 158 LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); 159 } else { 160 try { 161 listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() { 162 @Override 163 public void newDead(ServerName sn) { 164 locator.clearCache(sn); 165 rpcClient.cancelConnections(sn); 166 } 167 }, conf, listenerClass); 168 } catch (IOException e) { 169 LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); 170 } 171 } 172 } 173 this.clusterStatusListener = listener; 174 } 175 176 private void spawnRenewalChore(final UserGroupInformation user) { 177 ChoreService service = getChoreService(); 178 service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf)); 179 } 180 181 /** 182 * If choreService has not been created yet, create the ChoreService. n 183 */ 184 synchronized ChoreService getChoreService() { 185 if (isClosed()) { 186 throw new IllegalStateException("connection is already closed"); 187 } 188 if (choreService == null) { 189 choreService = new ChoreService("AsyncConn Chore Service"); 190 } 191 return choreService; 192 } 193 194 public User getUser() { 195 return user; 196 } 197 198 public ConnectionRegistry getConnectionRegistry() { 199 return registry; 200 } 201 202 @Override 203 public Configuration getConfiguration() { 204 return conf; 205 } 206 207 @Override 208 public void close() { 209 TraceUtil.trace(() -> { 210 // As the code below is safe to be executed in parallel, here we do not use CAS or lock, 211 // just a simple volatile flag. 212 if (closed) { 213 return; 214 } 215 LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); 216 if (LOG.isDebugEnabled()) { 217 logCallStack(Thread.currentThread().getStackTrace()); 218 } 219 IOUtils.closeQuietly(clusterStatusListener, 220 e -> LOG.warn("failed to close clusterStatusListener", e)); 221 IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e)); 222 IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e)); 223 synchronized (this) { 224 if (choreService != null) { 225 choreService.shutdown(); 226 choreService = null; 227 } 228 } 229 metrics.ifPresent(MetricsConnection::shutdown); 230 closed = true; 231 }, "AsyncConnection.close"); 232 } 233 234 private void logCallStack(StackTraceElement[] stackTraceElements) { 235 StringBuilder stackBuilder = new StringBuilder("Call stack:"); 236 for (StackTraceElement element : stackTraceElements) { 237 stackBuilder.append("\n at "); 238 stackBuilder.append(element); 239 } 240 stackBuilder.append("\n"); 241 LOG.debug(stackBuilder.toString()); 242 } 243 244 @Override 245 public boolean isClosed() { 246 return closed; 247 } 248 249 @Override 250 public AsyncTableRegionLocator getRegionLocator(TableName tableName) { 251 return new AsyncTableRegionLocatorImpl(tableName, this); 252 } 253 254 // we will override this method for testing retry caller, so do not remove this method. 255 AsyncRegionLocator getLocator() { 256 return locator; 257 } 258 259 // ditto 260 public NonceGenerator getNonceGenerator() { 261 return nonceGenerator; 262 } 263 264 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { 265 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 266 } 267 268 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 269 return ConcurrentMapUtils.computeIfAbsentEx(rsStubs, 270 getStubKey(ClientService.getDescriptor().getName(), serverName), 271 () -> createRegionServerStub(serverName)); 272 } 273 274 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException { 275 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 276 } 277 278 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException { 279 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 280 } 281 282 AdminService.Interface getAdminStub(ServerName serverName) throws IOException { 283 return ConcurrentMapUtils.computeIfAbsentEx(adminSubs, 284 getStubKey(AdminService.getDescriptor().getName(), serverName), 285 () -> createAdminServerStub(serverName)); 286 } 287 288 CompletableFuture<MasterService.Interface> getMasterStub() { 289 return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { 290 CompletableFuture<MasterService.Interface> future = new CompletableFuture<>(); 291 addListener(registry.getActiveMaster(), (addr, error) -> { 292 if (error != null) { 293 future.completeExceptionally(error); 294 } else if (addr == null) { 295 future.completeExceptionally(new MasterNotRunningException( 296 "ZooKeeper available but no active master location found")); 297 } else { 298 LOG.debug("The fetched master address is {}", addr); 299 try { 300 future.complete(createMasterStub(addr)); 301 } catch (IOException e) { 302 future.completeExceptionally(e); 303 } 304 } 305 306 }); 307 return future; 308 }, stub -> true, "master stub"); 309 } 310 311 String getClusterId() { 312 try { 313 return registry.getClusterId().get(); 314 } catch (InterruptedException | ExecutionException e) { 315 LOG.error("Error fetching cluster ID: ", e); 316 } 317 return null; 318 } 319 320 void clearMasterStubCache(MasterService.Interface stub) { 321 masterStub.compareAndSet(stub, null); 322 } 323 324 Optional<ServerStatisticTracker> getStatisticsTracker() { 325 return stats; 326 } 327 328 ClientBackoffPolicy getBackoffPolicy() { 329 return backoffPolicy; 330 } 331 332 @Override 333 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { 334 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { 335 336 @Override 337 public AsyncTable<AdvancedScanResultConsumer> build() { 338 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 339 } 340 }; 341 } 342 343 @Override 344 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, 345 ExecutorService pool) { 346 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { 347 348 @Override 349 public AsyncTable<ScanResultConsumer> build() { 350 RawAsyncTableImpl rawTable = 351 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 352 return new AsyncTableImpl(rawTable, pool); 353 } 354 }; 355 } 356 357 @Override 358 public AsyncAdminBuilder getAdminBuilder() { 359 return new AsyncAdminBuilderBase(connConf) { 360 @Override 361 public AsyncAdmin build() { 362 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 363 } 364 }; 365 } 366 367 @Override 368 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { 369 return new AsyncAdminBuilderBase(connConf) { 370 @Override 371 public AsyncAdmin build() { 372 RawAsyncHBaseAdmin rawAdmin = 373 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 374 return new AsyncHBaseAdmin(rawAdmin, pool); 375 } 376 }; 377 } 378 379 @Override 380 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { 381 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER); 382 } 383 384 @Override 385 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, 386 ExecutorService pool) { 387 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), 388 RETRY_TIMER); 389 } 390 391 private Hbck getHbckInternal(ServerName masterServer) { 392 Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName()); 393 // we will not create a new connection when creating a new protobuf stub, and for hbck there 394 // will be no performance consideration, so for simplification we will create a new stub every 395 // time instead of caching the stub here. 396 return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( 397 rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); 398 } 399 400 @Override 401 public CompletableFuture<Hbck> getHbck() { 402 return TraceUtil.tracedFuture(() -> { 403 CompletableFuture<Hbck> future = new CompletableFuture<>(); 404 addListener(registry.getActiveMaster(), (sn, error) -> { 405 if (error != null) { 406 future.completeExceptionally(error); 407 } else { 408 future.complete(getHbckInternal(sn)); 409 } 410 }); 411 return future; 412 }, "AsyncConnection.getHbck"); 413 } 414 415 @Override 416 public Hbck getHbck(ServerName masterServer) { 417 return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck"); 418 } 419 420 @Override 421 public void clearRegionLocationCache() { 422 locator.clearCache(); 423 } 424 425 Optional<MetricsConnection> getConnectionMetrics() { 426 return metrics; 427 } 428}