001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED; 021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT; 022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS; 023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS; 024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; 027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; 028import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 029 030import java.io.IOException; 031import java.util.Optional; 032import java.util.concurrent.CompletableFuture; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentMap; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicReference; 038import org.apache.commons.io.IOUtils; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.AuthUtil; 041import org.apache.hadoop.hbase.ChoreService; 042import org.apache.hadoop.hbase.MasterNotRunningException; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; 046import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; 047import org.apache.hadoop.hbase.ipc.RpcClient; 048import org.apache.hadoop.hbase.ipc.RpcClientFactory; 049import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 055import org.apache.hadoop.hbase.util.ConcurrentMapUtils; 056import org.apache.hadoop.hbase.util.Threads; 057import org.apache.hadoop.security.UserGroupInformation; 058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 059import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * The implementation of AsyncConnection. 066 */ 067@InterfaceAudience.Private 068class AsyncConnectionImpl implements AsyncConnection { 069 070 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); 071 072 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( 073 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true) 074 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 075 10, TimeUnit.MILLISECONDS); 076 077 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; 078 079 private final Configuration conf; 080 081 final AsyncConnectionConfiguration connConf; 082 083 private final User user; 084 085 final ConnectionRegistry registry; 086 087 private final int rpcTimeout; 088 089 private final RpcClient rpcClient; 090 091 final RpcControllerFactory rpcControllerFactory; 092 093 private final boolean hostnameCanChange; 094 095 private final AsyncRegionLocator locator; 096 097 final AsyncRpcRetryingCallerFactory callerFactory; 098 099 private final NonceGenerator nonceGenerator; 100 101 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); 102 private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>(); 103 104 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); 105 106 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = 107 new AtomicReference<>(); 108 109 private final Optional<ServerStatisticTracker> stats; 110 private final ClientBackoffPolicy backoffPolicy; 111 112 private ChoreService choreService; 113 114 private volatile boolean closed = false; 115 116 private final Optional<MetricsConnection> metrics; 117 118 private final ClusterStatusListener clusterStatusListener; 119 120 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, 121 User user) { 122 this.conf = conf; 123 this.user = user; 124 125 if (user.isLoginFromKeytab()) { 126 spawnRenewalChore(user.getUGI()); 127 } 128 this.connConf = new AsyncConnectionConfiguration(conf); 129 this.registry = registry; 130 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) { 131 this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null)); 132 } else { 133 this.metrics = Optional.empty(); 134 } 135 this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null)); 136 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 137 this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); 138 this.rpcTimeout = 139 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); 140 this.locator = new AsyncRegionLocator(this, RETRY_TIMER); 141 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); 142 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { 143 nonceGenerator = PerClientRandomNonceGenerator.get(); 144 } else { 145 nonceGenerator = NO_NONCE_GENERATOR; 146 } 147 this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf)); 148 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); 149 ClusterStatusListener listener = null; 150 if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) { 151 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass 152 // it in, just like clusterId. Not a big problem for now as the default value is false. 153 Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass( 154 STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); 155 if (listenerClass == null) { 156 LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); 157 } else { 158 try { 159 listener = new ClusterStatusListener( 160 new ClusterStatusListener.DeadServerHandler() { 161 @Override 162 public void newDead(ServerName sn) { 163 locator.clearCache(sn); 164 rpcClient.cancelConnections(sn); 165 } 166 }, conf, listenerClass); 167 } catch (IOException e) { 168 LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); 169 } 170 } 171 } 172 this.clusterStatusListener = listener; 173 } 174 175 private void spawnRenewalChore(final UserGroupInformation user) { 176 ChoreService service = getChoreService(); 177 service.scheduleChore(AuthUtil.getAuthRenewalChore(user)); 178 } 179 180 /** 181 * If choreService has not been created yet, create the ChoreService. 182 * @return ChoreService 183 */ 184 synchronized ChoreService getChoreService() { 185 if (choreService == null) { 186 choreService = new ChoreService("AsyncConn Chore Service"); 187 } 188 return choreService; 189 } 190 191 @Override 192 public Configuration getConfiguration() { 193 return conf; 194 } 195 196 @Override 197 public void close() { 198 // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a 199 // simple volatile flag. 200 if (closed) { 201 return; 202 } 203 LOG.info("Connection has been closed by {}.", Thread.currentThread().getName()); 204 if(LOG.isDebugEnabled()){ 205 logCallStack(Thread.currentThread().getStackTrace()); 206 } 207 IOUtils.closeQuietly(clusterStatusListener); 208 IOUtils.closeQuietly(rpcClient); 209 IOUtils.closeQuietly(registry); 210 if (choreService != null) { 211 choreService.shutdown(); 212 } 213 metrics.ifPresent(MetricsConnection::shutdown); 214 closed = true; 215 } 216 217 private void logCallStack(StackTraceElement[] stackTraceElements) { 218 StringBuilder stackBuilder = new StringBuilder("Call stack:"); 219 for (StackTraceElement element : stackTraceElements) { 220 stackBuilder.append("\n at "); 221 stackBuilder.append(element); 222 } 223 stackBuilder.append("\n"); 224 LOG.debug(stackBuilder.toString()); 225 } 226 227 @Override 228 public boolean isClosed() { 229 return closed; 230 } 231 232 @Override 233 public AsyncTableRegionLocator getRegionLocator(TableName tableName) { 234 return new AsyncTableRegionLocatorImpl(tableName, this); 235 } 236 237 // we will override this method for testing retry caller, so do not remove this method. 238 AsyncRegionLocator getLocator() { 239 return locator; 240 } 241 242 // ditto 243 public NonceGenerator getNonceGenerator() { 244 return nonceGenerator; 245 } 246 247 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { 248 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 249 } 250 251 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 252 return ConcurrentMapUtils.computeIfAbsentEx(rsStubs, 253 getStubKey(ClientService.getDescriptor().getName(), serverName, hostnameCanChange), 254 () -> createRegionServerStub(serverName)); 255 } 256 257 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException { 258 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 259 } 260 261 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException { 262 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 263 } 264 265 AdminService.Interface getAdminStub(ServerName serverName) throws IOException { 266 return ConcurrentMapUtils.computeIfAbsentEx(adminSubs, 267 getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange), 268 () -> createAdminServerStub(serverName)); 269 } 270 271 CompletableFuture<MasterService.Interface> getMasterStub() { 272 return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { 273 CompletableFuture<MasterService.Interface> future = new CompletableFuture<>(); 274 addListener(registry.getActiveMaster(), (addr, error) -> { 275 if (error != null) { 276 future.completeExceptionally(error); 277 } else if (addr == null) { 278 future.completeExceptionally(new MasterNotRunningException( 279 "ZooKeeper available but no active master location found")); 280 } else { 281 LOG.debug("The fetched master address is {}", addr); 282 try { 283 future.complete(createMasterStub(addr)); 284 } catch (IOException e) { 285 future.completeExceptionally(e); 286 } 287 } 288 289 }); 290 return future; 291 }, stub -> true, "master stub"); 292 } 293 294 void clearMasterStubCache(MasterService.Interface stub) { 295 masterStub.compareAndSet(stub, null); 296 } 297 298 Optional<ServerStatisticTracker> getStatisticsTracker() { 299 return stats; 300 } 301 302 ClientBackoffPolicy getBackoffPolicy() { 303 return backoffPolicy; 304 } 305 306 @Override 307 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { 308 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { 309 310 @Override 311 public AsyncTable<AdvancedScanResultConsumer> build() { 312 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 313 } 314 }; 315 } 316 317 @Override 318 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, 319 ExecutorService pool) { 320 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { 321 322 @Override 323 public AsyncTable<ScanResultConsumer> build() { 324 RawAsyncTableImpl rawTable = 325 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 326 return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool); 327 } 328 }; 329 } 330 331 @Override 332 public AsyncAdminBuilder getAdminBuilder() { 333 return new AsyncAdminBuilderBase(connConf) { 334 @Override 335 public AsyncAdmin build() { 336 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 337 } 338 }; 339 } 340 341 @Override 342 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { 343 return new AsyncAdminBuilderBase(connConf) { 344 @Override 345 public AsyncAdmin build() { 346 RawAsyncHBaseAdmin rawAdmin = 347 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 348 return new AsyncHBaseAdmin(rawAdmin, pool); 349 } 350 }; 351 } 352 353 @Override 354 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { 355 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER); 356 } 357 358 @Override 359 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, 360 ExecutorService pool) { 361 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), 362 RETRY_TIMER); 363 } 364 365 @Override 366 public CompletableFuture<Hbck> getHbck() { 367 CompletableFuture<Hbck> future = new CompletableFuture<>(); 368 addListener(registry.getActiveMaster(), (sn, error) -> { 369 if (error != null) { 370 future.completeExceptionally(error); 371 } else { 372 try { 373 future.complete(getHbck(sn)); 374 } catch (IOException e) { 375 future.completeExceptionally(e); 376 } 377 } 378 }); 379 return future; 380 } 381 382 @Override 383 public Hbck getHbck(ServerName masterServer) throws IOException { 384 // we will not create a new connection when creating a new protobuf stub, and for hbck there 385 // will be no performance consideration, so for simplification we will create a new stub every 386 // time instead of caching the stub here. 387 return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( 388 rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); 389 } 390 391 @Override 392 public void clearRegionLocationCache() { 393 locator.clearCache(); 394 } 395 396 Optional<MetricsConnection> getConnectionMetrics() { 397 return metrics; 398 } 399}