001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; 021import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; 022import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY; 023import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 024 025import java.io.IOException; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicReference; 032import org.apache.commons.io.IOUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.MasterNotRunningException; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.ipc.RpcClient; 038import org.apache.hadoop.hbase.ipc.RpcClientFactory; 039import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 040import org.apache.hadoop.hbase.security.User; 041import org.apache.hadoop.hbase.util.CollectionUtils; 042import org.apache.hadoop.hbase.util.Threads; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 048import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; 049 050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 054 055/** 056 * The implementation of AsyncConnection. 057 */ 058@InterfaceAudience.Private 059class AsyncConnectionImpl implements AsyncConnection { 060 061 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class); 062 063 @VisibleForTesting 064 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( 065 Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); 066 067 private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; 068 069 private final Configuration conf; 070 071 final AsyncConnectionConfiguration connConf; 072 073 private final User user; 074 075 final AsyncRegistry registry; 076 077 private final int rpcTimeout; 078 079 private final RpcClient rpcClient; 080 081 final RpcControllerFactory rpcControllerFactory; 082 083 private final boolean hostnameCanChange; 084 085 private final AsyncRegionLocator locator; 086 087 final AsyncRpcRetryingCallerFactory callerFactory; 088 089 private final NonceGenerator nonceGenerator; 090 091 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>(); 092 private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>(); 093 094 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>(); 095 096 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture = 097 new AtomicReference<>(); 098 099 public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, 100 User user) { 101 this.conf = conf; 102 this.user = user; 103 this.connConf = new AsyncConnectionConfiguration(conf); 104 this.registry = registry; 105 this.rpcClient = RpcClientFactory.createClient(conf, clusterId); 106 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); 107 this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); 108 this.rpcTimeout = 109 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs())); 110 this.locator = new AsyncRegionLocator(this, RETRY_TIMER); 111 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER); 112 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { 113 nonceGenerator = PerClientRandomNonceGenerator.get(); 114 } else { 115 nonceGenerator = NO_NONCE_GENERATOR; 116 } 117 } 118 119 @Override 120 public Configuration getConfiguration() { 121 return conf; 122 } 123 124 @Override 125 public void close() { 126 IOUtils.closeQuietly(rpcClient); 127 IOUtils.closeQuietly(registry); 128 } 129 130 @Override 131 public AsyncTableRegionLocator getRegionLocator(TableName tableName) { 132 return new AsyncTableRegionLocatorImpl(tableName, locator); 133 } 134 135 // we will override this method for testing retry caller, so do not remove this method. 136 AsyncRegionLocator getLocator() { 137 return locator; 138 } 139 140 // ditto 141 @VisibleForTesting 142 public NonceGenerator getNonceGenerator() { 143 return nonceGenerator; 144 } 145 146 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException { 147 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 148 } 149 150 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { 151 return CollectionUtils.computeIfAbsentEx(rsStubs, 152 getStubKey(ClientService.Interface.class.getSimpleName(), serverName, hostnameCanChange), 153 () -> createRegionServerStub(serverName)); 154 } 155 156 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException { 157 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 158 } 159 160 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException { 161 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout)); 162 } 163 164 AdminService.Interface getAdminStub(ServerName serverName) throws IOException { 165 return CollectionUtils.computeIfAbsentEx(adminSubs, 166 getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange), 167 () -> createAdminServerStub(serverName)); 168 } 169 170 CompletableFuture<MasterService.Interface> getMasterStub() { 171 return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> { 172 CompletableFuture<MasterService.Interface> future = new CompletableFuture<>(); 173 addListener(registry.getMasterAddress(), (addr, error) -> { 174 if (error != null) { 175 future.completeExceptionally(error); 176 } else if (addr == null) { 177 future.completeExceptionally(new MasterNotRunningException( 178 "ZooKeeper available but no active master location found")); 179 } else { 180 LOG.debug("The fetched master address is {}", addr); 181 try { 182 future.complete(createMasterStub(addr)); 183 } catch (IOException e) { 184 future.completeExceptionally(e); 185 } 186 } 187 188 }); 189 return future; 190 }, stub -> true, "master stub"); 191 } 192 193 void clearMasterStubCache(MasterService.Interface stub) { 194 masterStub.compareAndSet(stub, null); 195 } 196 197 @Override 198 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) { 199 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) { 200 201 @Override 202 public AsyncTable<AdvancedScanResultConsumer> build() { 203 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 204 } 205 }; 206 } 207 208 @Override 209 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, 210 ExecutorService pool) { 211 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) { 212 213 @Override 214 public AsyncTable<ScanResultConsumer> build() { 215 RawAsyncTableImpl rawTable = 216 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this); 217 return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool); 218 } 219 }; 220 } 221 222 @Override 223 public AsyncAdminBuilder getAdminBuilder() { 224 return new AsyncAdminBuilderBase(connConf) { 225 @Override 226 public AsyncAdmin build() { 227 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 228 } 229 }; 230 } 231 232 @Override 233 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) { 234 return new AsyncAdminBuilderBase(connConf) { 235 @Override 236 public AsyncAdmin build() { 237 RawAsyncHBaseAdmin rawAdmin = 238 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this); 239 return new AsyncHBaseAdmin(rawAdmin, pool); 240 } 241 }; 242 } 243 244 @Override 245 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { 246 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER); 247 } 248 249 @Override 250 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, 251 ExecutorService pool) { 252 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool), 253 RETRY_TIMER); 254 } 255 256 @Override 257 public CompletableFuture<Hbck> getHbck() { 258 CompletableFuture<Hbck> future = new CompletableFuture<>(); 259 addListener(registry.getMasterAddress(), (sn, error) -> { 260 if (error != null) { 261 future.completeExceptionally(error); 262 } else { 263 try { 264 future.complete(getHbck(sn)); 265 } catch (IOException e) { 266 future.completeExceptionally(e); 267 } 268 } 269 }); 270 return future; 271 } 272 273 @Override 274 public Hbck getHbck(ServerName masterServer) throws IOException { 275 // we will not create a new connection when creating a new protobuf stub, and for hbck there 276 // will be no performance consideration, so for simplification we will create a new stub every 277 // time instead of caching the stub here. 278 return new HBaseHbck(MasterProtos.HbckService.newBlockingStub( 279 rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory); 280 } 281}