001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.trace.TraceUtil.trace; 021import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import com.google.errorprone.annotations.RestrictedApi; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.function.Predicate; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 041import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; 042import org.apache.hadoop.hbase.ipc.HBaseRpcController; 043import org.apache.hadoop.hbase.ipc.RpcClient; 044import org.apache.hadoop.hbase.ipc.RpcClientFactory; 045import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.util.FutureUtils; 048import org.apache.yetus.audience.InterfaceAudience; 049 050import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 051import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 052import org.apache.hbase.thirdparty.com.google.protobuf.Message; 053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 054 055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; 063 064/** 065 * Base class for rpc based connection registry implementation. 066 * <p/> 067 * The implementation needs a bootstrap node list in configuration, and then it will use the methods 068 * in {@link ClientMetaService} to refresh the connection registry end points. 069 * <p/> 070 * It also supports hedged reads, the default fan out value is 2. 071 * <p/> 072 * For the actual configuration names, see javadoc of sub classes. 073 */ 074@InterfaceAudience.Private 075abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry { 076 077 /** Default value for the fan out of hedged requests. **/ 078 public static final int HEDGED_REQS_FANOUT_DEFAULT = 2; 079 080 private final int hedgedReadFanOut; 081 082 // Configured list of end points to probe the meta information from. 083 private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub; 084 085 // RPC client used to talk to the masters. 086 private final RpcClient rpcClient; 087 private final RpcControllerFactory rpcControllerFactory; 088 private final int rpcTimeoutMs; 089 090 private final RegistryEndpointsRefresher registryEndpointRefresher; 091 092 protected AbstractRpcBasedConnectionRegistry(Configuration conf, 093 String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName, 094 String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName) 095 throws IOException { 096 this.hedgedReadFanOut = 097 Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT)); 098 rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, 099 conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 100 // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch 101 // this through the master registry... 102 // This is a problem as we will use the cluster id to determine the authentication method 103 rpcClient = RpcClientFactory.createClient(conf, null); 104 rpcControllerFactory = RpcControllerFactory.instantiate(conf); 105 populateStubs(getBootstrapNodes(conf)); 106 // could return null here is refresh interval is less than zero 107 registryEndpointRefresher = 108 RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName, 109 refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs); 110 } 111 112 protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException; 113 114 protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints(); 115 116 private void refreshStubs() throws IOException { 117 populateStubs(FutureUtils.get(fetchEndpoints())); 118 } 119 120 private void populateStubs(Set<ServerName> addrs) throws IOException { 121 Preconditions.checkNotNull(addrs); 122 ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder = 123 ImmutableMap.builderWithExpectedSize(addrs.size()); 124 User user = User.getCurrent(); 125 for (ServerName masterAddr : addrs) { 126 builder.put(masterAddr, 127 ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); 128 } 129 addr2Stub = builder.build(); 130 } 131 132 /** 133 * For describing the actual asynchronous rpc call. 134 * <p/> 135 * Typically, you can use lambda expression to implement this interface as 136 * 137 * <pre> 138 * (c, s, d) -> s.xxx(c, your request here, d) 139 * </pre> 140 */ 141 @FunctionalInterface 142 protected interface Callable<T> { 143 void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done); 144 } 145 146 private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub, 147 Callable<T> callable) { 148 HBaseRpcController controller = rpcControllerFactory.newController(); 149 CompletableFuture<T> future = new CompletableFuture<>(); 150 callable.call(controller, stub, resp -> { 151 if (controller.failed()) { 152 IOException failureReason = controller.getFailed(); 153 future.completeExceptionally(failureReason); 154 if (ClientExceptionsUtil.isConnectionException(failureReason)) { 155 // RPC has failed, trigger a refresh of end points. We can have some spurious 156 // refreshes, but that is okay since the RPC is not expensive and not in a hot path. 157 registryEndpointRefresher.refreshNow(); 158 } 159 } else { 160 future.complete(resp); 161 } 162 }); 163 return future; 164 } 165 166 private IOException badResponse(String debug) { 167 return new IOException(String.format("Invalid result for request %s. Will be retried", debug)); 168 } 169 170 /** 171 * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded, 172 * we will complete the future and quit. If all the requests in one round are failed, we will 173 * start another round to send requests concurrently tohedgedReadsFanout end points. If all end 174 * points have been tried and all of them are failed, we will fail the future. 175 */ 176 private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers, 177 List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable, 178 Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) { 179 int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size()); 180 AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive); 181 for (int i = startIndexInclusive; i < endIndexExclusive; i++) { 182 addListener(call(stubs.get(i), callable), (r, e) -> { 183 // a simple check to skip all the later operations earlier 184 if (future.isDone()) { 185 return; 186 } 187 if (e == null && !isValidResp.test(r)) { 188 e = badResponse(debug); 189 } 190 if (e != null) { 191 // make sure when remaining reaches 0 we have all exceptions in the errors queue 192 errors.add(e); 193 if (remaining.decrementAndGet() == 0) { 194 if (endIndexExclusive == stubs.size()) { 195 // we are done, complete the future with exception 196 RetriesExhaustedException ex = 197 new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors)); 198 future.completeExceptionally(new MasterRegistryFetchException(servers, ex)); 199 } else { 200 groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug, 201 errors); 202 } 203 } 204 } else { 205 // do not need to decrement the counter any more as we have already finished the future. 206 future.complete(r); 207 } 208 }); 209 } 210 } 211 212 protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable, 213 Predicate<T> isValidResp, String debug) { 214 ImmutableMap<ServerName, ClientMetaService.Interface> addr2StubRef = addr2Stub; 215 Set<ServerName> servers = addr2StubRef.keySet(); 216 List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2StubRef.values()); 217 Collections.shuffle(stubs, ThreadLocalRandom.current()); 218 CompletableFuture<T> future = new CompletableFuture<>(); 219 groupCall(future, servers, stubs, 0, callable, isValidResp, debug, 220 new ConcurrentLinkedQueue<>()); 221 return future; 222 } 223 224 @RestrictedApi(explanation = "Should only be called in tests", link = "", 225 allowedOnPath = ".*/src/test/.*") 226 Set<ServerName> getParsedServers() { 227 return addr2Stub.keySet(); 228 } 229 230 /** 231 * Simple helper to transform the result of getMetaRegionLocations() rpc. 232 */ 233 private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { 234 List<HRegionLocation> regionLocations = new ArrayList<>(); 235 resp.getMetaLocationsList() 236 .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); 237 return new RegionLocations(regionLocations); 238 } 239 240 @Override 241 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 242 return tracedFuture( 243 () -> this 244 .<GetMetaRegionLocationsResponse> call( 245 (c, s, d) -> s.getMetaRegionLocations(c, 246 GetMetaRegionLocationsRequest.getDefaultInstance(), d), 247 r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount") 248 .thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations), 249 getClass().getSimpleName() + ".getMetaRegionLocations"); 250 } 251 252 @Override 253 public CompletableFuture<String> getClusterId() { 254 return tracedFuture( 255 () -> this 256 .<GetClusterIdResponse> call( 257 (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d), 258 GetClusterIdResponse::hasClusterId, "getClusterId()") 259 .thenApply(GetClusterIdResponse::getClusterId), 260 getClass().getSimpleName() + ".getClusterId"); 261 } 262 263 @Override 264 public CompletableFuture<ServerName> getActiveMaster() { 265 return tracedFuture( 266 () -> this 267 .<GetActiveMasterResponse> call( 268 (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d), 269 GetActiveMasterResponse::hasServerName, "getActiveMaster()") 270 .thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName())), 271 getClass().getSimpleName() + ".getActiveMaster"); 272 } 273 274 @Override 275 public void close() { 276 trace(() -> { 277 if (registryEndpointRefresher != null) { 278 registryEndpointRefresher.stop(); 279 } 280 if (rpcClient != null) { 281 rpcClient.close(); 282 } 283 }, getClass().getSimpleName() + ".close"); 284 } 285}