001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 022import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024 025import java.io.IOException; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.commons.io.IOUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.MasterNotRunningException; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.HBaseRpcController; 038import org.apache.hadoop.hbase.ipc.RpcClient; 039import org.apache.hadoop.hbase.ipc.RpcClientFactory; 040import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 041import org.apache.hadoop.hbase.security.User; 042import org.apache.hadoop.hbase.util.CollectionUtils; 043import org.apache.hadoop.hbase.util.Threads; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 049import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 050import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 051 052import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 058 059/** 060 * The implementation of AsyncConnection. 061 */ 062@InterfaceAudience.Private 063class AsyncConnectionImpl implements AsyncConnection { 064 065 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); 066 067 @VisibleForTesting 068 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( 069 Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); 070 071 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; 072 073 private final Configuration conf; 074 075 final AsyncConnectionConfiguration connConf; 076 077 private final User user; 078 079 final AsyncRegistry registry; 080 081 private final int rpcTimeout; 082 083 private final RpcClient rpcClient; 084 085 final RpcControllerFactory rpcControllerFactory; 086 087 private final boolean hostnameCanChange; 088 089 private final AsyncRegionLocator locator; 090 091 final AsyncRpcRetryingCallerFactory callerFactory; 092 093 private final NonceGenerator nonceGenerator; 094 095 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); 096 private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>(); 097 098 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); 099 100 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = 101 new AtomicReference<>(); 102 103 public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, 104 User user) { 105 this.conf = conf; 106 this.user = user; 107 this.connConf = new AsyncConnectionConfiguration(conf); 108 this.registry = registry; 109 this.rpcClient = RpcClientFactory.createClient(conf, clusterId); 110 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 111 this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); 112 this.rpcTimeout = 113 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); 114 this.locator = new AsyncRegionLocator(this, RETRY_TIMER); 115 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); 116 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { 117 nonceGenerator = PerClientRandomNonceGenerator.get(); 118 } else { 119 nonceGenerator = NO_NONCE_GENERATOR; 120 } 121 } 122 123 @Override 124 public Configuration getConfiguration() { 125 return conf; 126 } 127 128 @Override 129 public void close() { 130 IOUtils.closeQuietly(rpcClient); 131 IOUtils.closeQuietly(registry); 132 } 133 134 @Override 135 public AsyncTableRegionLocator getRegionLocator(TableName tableName) { 136 return new AsyncTableRegionLocatorImpl(tableName, locator); 137 } 138 139 // we will override this method for testing retry caller, so do not remove this method. 140 AsyncRegionLocator getLocator() { 141 return locator; 142 } 143 144 // ditto 145 @VisibleForTesting 146 public NonceGenerator getNonceGenerator() { 147 return nonceGenerator; 148 } 149 150 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { 151 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 152 } 153 154 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 155 return CollectionUtils.computeIfAbsentEx(rsStubs, 156 getStubKey(ClientService.Interface.class.getSimpleName(), serverName, hostnameCanChange), 157 () -> createRegionServerStub(serverName)); 158 } 159 160 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException { 161 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 162 } 163 164 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException { 165 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 166 } 167 168 AdminService.Interface getAdminStub(ServerName serverName) throws IOException { 169 return CollectionUtils.computeIfAbsentEx(adminSubs, 170 getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange), 171 () -> createAdminServerStub(serverName)); 172 } 173 174 private void makeMasterStub(CompletableFuture<MasterService.Interface> future) { 175 addListener(registry.getMasterAddress(), (sn, error) -> { 176 if (sn == null) { 177 String msg = "ZooKeeper available but no active master location found"; 178 LOG.info(msg); 179 this.masterStubMakeFuture.getAndSet(null) 180 .completeExceptionally(new MasterNotRunningException(msg)); 181 return; 182 } 183 try { 184 MasterService.Interface stub = createMasterStub(sn); 185 HBaseRpcController controller = getRpcController(); 186 stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(), 187 new RpcCallback<IsMasterRunningResponse>() { 188 @Override 189 public void run(IsMasterRunningResponse resp) { 190 if (controller.failed() || resp == null || 191 (resp != null && !resp.getIsMasterRunning())) { 192 masterStubMakeFuture.getAndSet(null).completeExceptionally( 193 new MasterNotRunningException("Master connection is not running anymore")); 194 } else { 195 masterStub.set(stub); 196 masterStubMakeFuture.set(null); 197 future.complete(stub); 198 } 199 } 200 }); 201 } catch (IOException e) { 202 this.masterStubMakeFuture.getAndSet(null) 203 .completeExceptionally(new IOException("Failed to create async master stub", e)); 204 } 205 }); 206 } 207 208 CompletableFuture<MasterService.Interface> getMasterStub() { 209 MasterService.Interface masterStub = this.masterStub.get(); 210 211 if (masterStub == null) { 212 for (;;) { 213 if (this.masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) { 214 CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get(); 215 makeMasterStub(future); 216 } else { 217 CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get(); 218 if (future != null) { 219 return future; 220 } 221 } 222 } 223 } 224 225 for (;;) { 226 if (masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) { 227 CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get(); 228 HBaseRpcController controller = getRpcController(); 229 masterStub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(), 230 new RpcCallback<IsMasterRunningResponse>() { 231 @Override 232 public void run(IsMasterRunningResponse resp) { 233 if (controller.failed() || resp == null || 234 (resp != null && !resp.getIsMasterRunning())) { 235 makeMasterStub(future); 236 } else { 237 future.complete(masterStub); 238 } 239 } 240 }); 241 } else { 242 CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get(); 243 if (future != null) { 244 return future; 245 } 246 } 247 } 248 } 249 250 private HBaseRpcController getRpcController() { 251 HBaseRpcController controller = this.rpcControllerFactory.newController(); 252 controller.setCallTimeout((int) TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); 253 return controller; 254 } 255 256 @Override 257 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { 258 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { 259 260 @Override 261 public AsyncTable<AdvancedScanResultConsumer> build() { 262 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 263 } 264 }; 265 } 266 267 @Override 268 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, 269 ExecutorService pool) { 270 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { 271 272 @Override 273 public AsyncTable<ScanResultConsumer> build() { 274 RawAsyncTableImpl rawTable = 275 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 276 return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool); 277 } 278 }; 279 } 280 281 @Override 282 public AsyncAdminBuilder getAdminBuilder() { 283 return new AsyncAdminBuilderBase(connConf) { 284 @Override 285 public AsyncAdmin build() { 286 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 287 } 288 }; 289 } 290 291 @Override 292 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { 293 return new AsyncAdminBuilderBase(connConf) { 294 @Override 295 public AsyncAdmin build() { 296 RawAsyncHBaseAdmin rawAdmin = 297 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 298 return new AsyncHBaseAdmin(rawAdmin, pool); 299 } 300 }; 301 } 302 303 @Override 304 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { 305 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER); 306 } 307 308 @Override 309 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, 310 ExecutorService pool) { 311 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), 312 RETRY_TIMER); 313 } 314 315 @Override 316 public CompletableFuture<Hbck> getHbck() { 317 CompletableFuture<Hbck> future = new CompletableFuture<>(); 318 addListener(registry.getMasterAddress(), (sn, error) -> { 319 if (error != null) { 320 future.completeExceptionally(error); 321 } else { 322 try { 323 future.complete(getHbck(sn)); 324 } catch (IOException e) { 325 future.completeExceptionally(e); 326 } 327 } 328 }); 329 return future; 330 } 331 332 @Override 333 public Hbck getHbck(ServerName masterServer) throws IOException { 334 // we will not create a new connection when creating a new protobuf stub, and for hbck there 335 // will be no performance consideration, so for simplification we will create a new stub every 336 // time instead of caching the stub here. 337 return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( 338 rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); 339 } 340}