001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Set; 025import java.util.concurrent.CompletableFuture; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.ipc.RpcClient; 030import org.apache.hadoop.hbase.ipc.RpcClientFactory; 031import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.util.FutureUtils; 034import org.apache.hadoop.hbase.util.IOExceptionSupplier; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 040import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 043 044/** 045 * A class for creating {@link RpcClient} and related stubs used by 046 * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap nodes to get the 047 * cluster id first, before creating the final {@link RpcClient} and related stubs. 048 * <p> 049 * See HBASE-25051 for more details. 050 */ 051@InterfaceAudience.Private 052class ConnectionRegistryRpcStubHolder implements Closeable { 053 054 private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class); 055 056 private final Configuration conf; 057 058 // used for getting cluster id 059 private final Configuration noAuthConf; 060 061 private final User user; 062 063 private final RpcControllerFactory rpcControllerFactory; 064 065 private final Set<ServerName> bootstrapNodes; 066 067 private final int rpcTimeoutMs; 068 069 private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub; 070 071 private volatile RpcClient rpcClient; 072 073 private CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> addr2StubFuture; 074 075 ConnectionRegistryRpcStubHolder(Configuration conf, User user, 076 RpcControllerFactory rpcControllerFactory, Set<ServerName> bootstrapNodes) { 077 this.conf = conf; 078 if (User.isHBaseSecurityEnabled(conf)) { 079 this.noAuthConf = new Configuration(conf); 080 this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); 081 } else { 082 this.noAuthConf = conf; 083 } 084 this.user = user; 085 this.rpcControllerFactory = rpcControllerFactory; 086 this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes); 087 this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, 088 conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 089 } 090 091 private ImmutableMap<ServerName, ClientMetaService.Interface> createStubs(RpcClient rpcClient, 092 Collection<ServerName> addrs) { 093 LOG.debug("Going to use new servers to create stubs: {}", addrs); 094 Preconditions.checkNotNull(addrs); 095 ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder = 096 ImmutableMap.builderWithExpectedSize(addrs.size()); 097 for (ServerName masterAddr : addrs) { 098 builder.put(masterAddr, 099 ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); 100 } 101 return builder.build(); 102 } 103 104 private CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> 105 fetchClusterIdAndCreateStubs() { 106 CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> future = 107 new CompletableFuture<>(); 108 addr2StubFuture = future; 109 FutureUtils.addListener( 110 new ClusterIdFetcher(noAuthConf, user, rpcControllerFactory, bootstrapNodes).fetchClusterId(), 111 (clusterId, error) -> { 112 synchronized (ConnectionRegistryRpcStubHolder.this) { 113 if (error != null) { 114 addr2StubFuture.completeExceptionally(error); 115 } else { 116 RpcClient c = RpcClientFactory.createClient(conf, clusterId); 117 ImmutableMap<ServerName, ClientMetaService.Interface> m = 118 createStubs(c, bootstrapNodes); 119 rpcClient = c; 120 addr2Stub = m; 121 addr2StubFuture.complete(m); 122 } 123 addr2StubFuture = null; 124 } 125 }); 126 // here we must use the local variable future instead of addr2StubFuture, as the above listener 127 // could be executed directly in the same thread(if the future completes quick enough), since 128 // the synchronized lock is reentrant, it could set addr2StubFuture to null in the end, so when 129 // arriving here the addr2StubFuture could be null. 130 return future; 131 } 132 133 CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> getStubs() { 134 ImmutableMap<ServerName, ClientMetaService.Interface> s = this.addr2Stub; 135 if (s != null) { 136 return CompletableFuture.completedFuture(s); 137 } 138 synchronized (this) { 139 s = this.addr2Stub; 140 if (s != null) { 141 return CompletableFuture.completedFuture(s); 142 } 143 if (addr2StubFuture != null) { 144 return addr2StubFuture; 145 } 146 return fetchClusterIdAndCreateStubs(); 147 } 148 } 149 150 void refreshStubs(IOExceptionSupplier<Collection<ServerName>> fetchEndpoints) throws IOException { 151 // There is no actual call yet so we have not initialize the rpc client and related stubs yet, 152 // give up refreshing 153 if (addr2Stub == null) { 154 LOG.debug("Skip refreshing stubs as we have not initialized rpc client yet"); 155 return; 156 } 157 LOG.debug("Going to refresh stubs"); 158 assert rpcClient != null; 159 addr2Stub = createStubs(rpcClient, fetchEndpoints.get()); 160 } 161 162 @Override 163 public void close() { 164 if (rpcClient != null) { 165 rpcClient.close(); 166 } 167 } 168}