001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.trace.TraceUtil.trace; 021import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import com.google.errorprone.annotations.RestrictedApi; 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.ThreadLocalRandom; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.function.Predicate; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 040import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; 041import org.apache.hadoop.hbase.ipc.HBaseRpcController; 042import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 043import org.apache.hadoop.hbase.security.User; 044import org.apache.hadoop.hbase.util.FutureUtils; 045import org.apache.yetus.audience.InterfaceAudience; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.Message; 048import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 049 050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; 058 059/** 060 * Base class for rpc based connection registry implementation. 061 * <p/> 062 * The implementation needs a bootstrap node list in configuration, and then it will use the methods 063 * in {@link ClientMetaService} to refresh the connection registry end points. 064 * <p/> 065 * It also supports hedged reads, the default fan out value is 2. 066 * <p/> 067 * For the actual configuration names, see javadoc of sub classes. 068 */ 069@InterfaceAudience.Private 070abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry { 071 072 /** Default value for the fan out of hedged requests. **/ 073 public static final int HEDGED_REQS_FANOUT_DEFAULT = 2; 074 075 private final int hedgedReadFanOut; 076 077 // RPC client used to talk to the masters. 078 private final ConnectionRegistryRpcStubHolder rpcStubHolder; 079 private final RpcControllerFactory rpcControllerFactory; 080 081 private final RegistryEndpointsRefresher registryEndpointRefresher; 082 083 protected AbstractRpcBasedConnectionRegistry(Configuration conf, User user, 084 String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName, 085 String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName) 086 throws IOException { 087 this.hedgedReadFanOut = 088 Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT)); 089 rpcControllerFactory = RpcControllerFactory.instantiate(conf); 090 rpcStubHolder = new ConnectionRegistryRpcStubHolder(conf, user, rpcControllerFactory, 091 getBootstrapNodes(conf)); 092 // could return null here is refresh interval is less than zero 093 registryEndpointRefresher = 094 RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName, 095 refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs); 096 } 097 098 protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException; 099 100 protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints(); 101 102 private void refreshStubs() throws IOException { 103 rpcStubHolder.refreshStubs(() -> FutureUtils.get(fetchEndpoints())); 104 } 105 106 /** 107 * For describing the actual asynchronous rpc call. 108 * <p/> 109 * Typically, you can use lambda expression to implement this interface as 110 * 111 * <pre> 112 * (c, s, d) -> s.xxx(c, your request here, d) 113 * </pre> 114 */ 115 @FunctionalInterface 116 protected interface Callable<T> { 117 void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done); 118 } 119 120 private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub, 121 Callable<T> callable) { 122 HBaseRpcController controller = rpcControllerFactory.newController(); 123 CompletableFuture<T> future = new CompletableFuture<>(); 124 callable.call(controller, stub, resp -> { 125 if (controller.failed()) { 126 IOException failureReason = controller.getFailed(); 127 future.completeExceptionally(failureReason); 128 if (ClientExceptionsUtil.isConnectionException(failureReason)) { 129 // RPC has failed, trigger a refresh of end points. We can have some spurious 130 // refreshes, but that is okay since the RPC is not expensive and not in a hot path. 131 registryEndpointRefresher.refreshNow(); 132 } 133 } else { 134 future.complete(resp); 135 } 136 }); 137 return future; 138 } 139 140 private IOException badResponse(String debug) { 141 return new IOException(String.format("Invalid result for request %s. Will be retried", debug)); 142 } 143 144 /** 145 * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded, 146 * we will complete the future and quit. If all the requests in one round are failed, we will 147 * start another round to send requests concurrently tohedgedReadsFanout end points. If all end 148 * points have been tried and all of them are failed, we will fail the future. 149 */ 150 private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers, 151 List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable, 152 Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) { 153 int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size()); 154 AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive); 155 for (int i = startIndexInclusive; i < endIndexExclusive; i++) { 156 addListener(call(stubs.get(i), callable), (r, e) -> { 157 // a simple check to skip all the later operations earlier 158 if (future.isDone()) { 159 return; 160 } 161 if (e == null && !isValidResp.test(r)) { 162 e = badResponse(debug); 163 } 164 if (e != null) { 165 // make sure when remaining reaches 0 we have all exceptions in the errors queue 166 errors.add(e); 167 if (remaining.decrementAndGet() == 0) { 168 if (endIndexExclusive == stubs.size()) { 169 // we are done, complete the future with exception 170 RetriesExhaustedException ex = 171 new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors)); 172 future.completeExceptionally(new MasterRegistryFetchException(servers, ex)); 173 } else { 174 groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug, 175 errors); 176 } 177 } 178 } else { 179 // do not need to decrement the counter any more as we have already finished the future. 180 future.complete(r); 181 } 182 }); 183 } 184 } 185 186 protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable, 187 Predicate<T> isValidResp, String debug) { 188 CompletableFuture<T> future = new CompletableFuture<>(); 189 FutureUtils.addListener(rpcStubHolder.getStubs(), (addr2Stub, error) -> { 190 if (error != null) { 191 future.completeExceptionally(error); 192 return; 193 } 194 Set<ServerName> servers = addr2Stub.keySet(); 195 List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2Stub.values()); 196 Collections.shuffle(stubs, ThreadLocalRandom.current()); 197 groupCall(future, servers, stubs, 0, callable, isValidResp, debug, 198 new ConcurrentLinkedQueue<>()); 199 }); 200 return future; 201 } 202 203 @RestrictedApi(explanation = "Should only be called in tests", link = "", 204 allowedOnPath = ".*/src/test/.*") 205 Set<ServerName> getParsedServers() throws IOException { 206 return FutureUtils.get(rpcStubHolder.getStubs()).keySet(); 207 } 208 209 /** 210 * Simple helper to transform the result of getMetaRegionLocations() rpc. 211 */ 212 private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { 213 List<HRegionLocation> regionLocations = new ArrayList<>(); 214 resp.getMetaLocationsList() 215 .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); 216 return new RegionLocations(regionLocations); 217 } 218 219 @Override 220 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 221 return tracedFuture( 222 () -> this 223 .<GetMetaRegionLocationsResponse> call( 224 (c, s, d) -> s.getMetaRegionLocations(c, 225 GetMetaRegionLocationsRequest.getDefaultInstance(), d), 226 r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount") 227 .thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations), 228 getClass().getSimpleName() + ".getMetaRegionLocations"); 229 } 230 231 @Override 232 public CompletableFuture<String> getClusterId() { 233 return tracedFuture( 234 () -> this 235 .<GetClusterIdResponse> call( 236 (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d), 237 GetClusterIdResponse::hasClusterId, "getClusterId()") 238 .thenApply(GetClusterIdResponse::getClusterId), 239 getClass().getSimpleName() + ".getClusterId"); 240 } 241 242 @Override 243 public CompletableFuture<ServerName> getActiveMaster() { 244 return tracedFuture( 245 () -> this 246 .<GetActiveMasterResponse> call( 247 (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d), 248 GetActiveMasterResponse::hasServerName, "getActiveMaster()") 249 .thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName())), 250 getClass().getSimpleName() + ".getActiveMaster"); 251 } 252 253 @Override 254 public void close() { 255 trace(() -> { 256 if (registryEndpointRefresher != null) { 257 registryEndpointRefresher.stop(); 258 } 259 if (rpcStubHolder != null) { 260 rpcStubHolder.close(); 261 } 262 }, getClass().getSimpleName() + ".close"); 263 } 264}