001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY; 021import static org.apache.hadoop.hbase.util.DNS.getHostname; 022import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 023 024import java.io.IOException; 025import java.net.UnknownHostException; 026import java.util.ArrayList; 027import java.util.Collections; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031import java.util.concurrent.CompletableFuture; 032import java.util.concurrent.ConcurrentLinkedQueue; 033import java.util.concurrent.ThreadLocalRandom; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.function.Predicate; 036import java.util.stream.Collectors; 037import org.apache.hadoop.conf.Configuration; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HRegionLocation; 040import org.apache.hadoop.hbase.RegionLocations; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 043import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; 044import org.apache.hadoop.hbase.ipc.HBaseRpcController; 045import org.apache.hadoop.hbase.ipc.RpcClient; 046import org.apache.hadoop.hbase.ipc.RpcClientFactory; 047import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 048import org.apache.hadoop.hbase.security.User; 049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse; 058import org.apache.hadoop.hbase.util.DNS.ServerType; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.common.base.Strings; 061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 062import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort; 063import org.apache.hbase.thirdparty.com.google.protobuf.Message; 064import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 065import org.apache.yetus.audience.InterfaceAudience; 066 067/** 068 * Master based registry implementation. Makes RPCs to the configured master addresses from config 069 * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}. 070 * <p/> 071 * It supports hedged reads, set the fan out of the requests batch by 072 * {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable 073 * it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}). 074 * <p/> 075 * TODO: Handle changes to the configuration dynamically without having to restart the client. 076 */ 077@InterfaceAudience.Private 078public class MasterRegistry implements ConnectionRegistry { 079 080 /** Configuration key that controls the fan out of requests **/ 081 public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY = 082 "hbase.client.master_registry.hedged.fanout"; 083 084 /** Default value for the fan out of hedged requests. **/ 085 public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2; 086 087 private static final String MASTER_ADDRS_CONF_SEPARATOR = ","; 088 089 private final int hedgedReadFanOut; 090 091 // Configured list of masters to probe the meta information from. 092 private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub; 093 094 // RPC client used to talk to the masters. 095 private final RpcClient rpcClient; 096 private final RpcControllerFactory rpcControllerFactory; 097 private final int rpcTimeoutMs; 098 099 protected final MasterAddressRefresher masterAddressRefresher; 100 101 /** 102 * Parses the list of master addresses from the provided configuration. Supported format is comma 103 * separated host[:port] values. If no port number if specified, default master port is assumed. 104 * @param conf Configuration to parse from. 105 */ 106 private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException { 107 Set<ServerName> masterAddrs = new HashSet<>(); 108 String configuredMasters = getMasterAddr(conf); 109 for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) { 110 HostAndPort masterHostPort = 111 HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT); 112 masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE)); 113 } 114 Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed"); 115 return masterAddrs; 116 } 117 118 MasterRegistry(Configuration conf) throws IOException { 119 this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 120 MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); 121 rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, 122 conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); 123 // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch 124 // this through the master registry... 125 // This is a problem as we will use the cluster id to determine the authentication method 126 rpcClient = RpcClientFactory.createClient(conf, null); 127 rpcControllerFactory = RpcControllerFactory.instantiate(conf); 128 // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters 129 // by fetching the end points from this list. 130 populateMasterStubs(parseMasterAddrs(conf)); 131 masterAddressRefresher = new MasterAddressRefresher(conf, this); 132 } 133 134 void populateMasterStubs(Set<ServerName> masters) throws IOException { 135 Preconditions.checkNotNull(masters); 136 ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder = 137 ImmutableMap.builderWithExpectedSize(masters.size()); 138 User user = User.getCurrent(); 139 for (ServerName masterAddr : masters) { 140 builder.put(masterAddr, 141 ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); 142 } 143 masterAddr2Stub = builder.build(); 144 } 145 146 /** 147 * Builds the default master address end point if it is not specified in the configuration. 148 * <p/> 149 * Will be called in {@code HBaseTestingUtility}. 150 */ 151 public static String getMasterAddr(Configuration conf) throws UnknownHostException { 152 String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY); 153 if (!Strings.isNullOrEmpty(masterAddrFromConf)) { 154 return masterAddrFromConf; 155 } 156 String hostname = getHostname(conf, ServerType.MASTER); 157 int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT); 158 return String.format("%s:%d", hostname, port); 159 } 160 161 /** 162 * For describing the actual asynchronous rpc call. 163 * <p/> 164 * Typically, you can use lambda expression to implement this interface as 165 * 166 * <pre> 167 * (c, s, d) -> s.xxx(c, your request here, d) 168 * </pre> 169 */ 170 @FunctionalInterface 171 private interface Callable<T> { 172 void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done); 173 } 174 175 private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub, 176 Callable<T> callable) { 177 HBaseRpcController controller = rpcControllerFactory.newController(); 178 CompletableFuture<T> future = new CompletableFuture<>(); 179 callable.call(controller, stub, resp -> { 180 if (controller.failed()) { 181 IOException failureReason = controller.getFailed(); 182 future.completeExceptionally(failureReason); 183 if (ClientExceptionsUtil.isConnectionException(failureReason)) { 184 // RPC has failed, trigger a refresh of master end points. We can have some spurious 185 // refreshes, but that is okay since the RPC is not expensive and not in a hot path. 186 masterAddressRefresher.refreshNow(); 187 } 188 } else { 189 future.complete(resp); 190 } 191 }); 192 return future; 193 } 194 195 private IOException badResponse(String debug) { 196 return new IOException(String.format("Invalid result for request %s. Will be retried", debug)); 197 } 198 199 /** 200 * send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we 201 * will complete the future and quit. If all the requests in one round are failed, we will start 202 * another round to send requests concurrently tohedgedReadsFanout masters. If all masters have 203 * been tried and all of them are failed, we will fail the future. 204 */ 205 private <T extends Message> void groupCall(CompletableFuture<T> future, 206 Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs, 207 int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug, 208 ConcurrentLinkedQueue<Throwable> errors) { 209 int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size()); 210 AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive); 211 for (int i = startIndexInclusive; i < endIndexExclusive; i++) { 212 addListener(call(masterStubs.get(i), callable), (r, e) -> { 213 // a simple check to skip all the later operations earlier 214 if (future.isDone()) { 215 return; 216 } 217 if (e == null && !isValidResp.test(r)) { 218 e = badResponse(debug); 219 } 220 if (e != null) { 221 // make sure when remaining reaches 0 we have all exceptions in the errors queue 222 errors.add(e); 223 if (remaining.decrementAndGet() == 0) { 224 if (endIndexExclusive == masterStubs.size()) { 225 // we are done, complete the future with exception 226 RetriesExhaustedException ex = new RetriesExhaustedException("masters", 227 masterStubs.size(), new ArrayList<>(errors)); 228 future.completeExceptionally( 229 new MasterRegistryFetchException(masterServers, ex)); 230 } else { 231 groupCall(future, masterServers, masterStubs, endIndexExclusive, callable, 232 isValidResp, debug, errors); 233 } 234 } 235 } else { 236 // do not need to decrement the counter any more as we have already finished the future. 237 future.complete(r); 238 } 239 }); 240 } 241 } 242 243 private <T extends Message> CompletableFuture<T> call(Callable<T> callable, 244 Predicate<T> isValidResp, String debug) { 245 ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub; 246 Set<ServerName> masterServers = masterAddr2StubRef.keySet(); 247 List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values()); 248 Collections.shuffle(masterStubs, ThreadLocalRandom.current()); 249 CompletableFuture<T> future = new CompletableFuture<>(); 250 groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug, 251 new ConcurrentLinkedQueue<>()); 252 return future; 253 } 254 255 /** 256 * Simple helper to transform the result of getMetaRegionLocations() rpc. 257 */ 258 private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { 259 List<HRegionLocation> regionLocations = new ArrayList<>(); 260 resp.getMetaLocationsList() 261 .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); 262 return new RegionLocations(regionLocations); 263 } 264 265 @Override 266 public CompletableFuture<RegionLocations> getMetaRegionLocations() { 267 return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c, 268 GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0, 269 "getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations); 270 } 271 272 @Override 273 public CompletableFuture<String> getClusterId() { 274 return this 275 .<GetClusterIdResponse> call( 276 (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d), 277 GetClusterIdResponse::hasClusterId, "getClusterId()") 278 .thenApply(GetClusterIdResponse::getClusterId); 279 } 280 281 private static boolean hasActiveMaster(GetMastersResponse resp) { 282 List<GetMastersResponseEntry> activeMasters = 283 resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect( 284 Collectors.toList()); 285 return activeMasters.size() == 1; 286 } 287 288 private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException { 289 List<GetMastersResponseEntry> activeMasters = 290 resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect( 291 Collectors.toList()); 292 if (activeMasters.size() != 1) { 293 throw new IOException(String.format("Incorrect number of active masters encountered." + 294 " Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters)); 295 } 296 return ProtobufUtil.toServerName(activeMasters.get(0).getServerName()); 297 } 298 299 @Override 300 public CompletableFuture<ServerName> getActiveMaster() { 301 CompletableFuture<ServerName> future = new CompletableFuture<>(); 302 addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d), 303 MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> { 304 if (ex != null) { 305 future.completeExceptionally(ex); 306 } 307 ServerName result = null; 308 try { 309 result = filterActiveMaster((GetMastersResponse)resp); 310 } catch (IOException e) { 311 future.completeExceptionally(e); 312 } 313 future.complete(result); 314 }); 315 return future; 316 } 317 318 private static List<ServerName> transformServerNames(GetMastersResponse resp) { 319 return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName( 320 s.getServerName())).collect(Collectors.toList()); 321 } 322 323 CompletableFuture<List<ServerName>> getMasters() { 324 return this 325 .<GetMastersResponse> call((c, s, d) -> s.getMasters( 326 c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0, 327 "getMasters()").thenApply(MasterRegistry::transformServerNames); 328 } 329 330 Set<ServerName> getParsedMasterServers() { 331 return masterAddr2Stub.keySet(); 332 } 333 334 @Override 335 public void close() { 336 if (masterAddressRefresher != null) { 337 masterAddressRefresher.close(); 338 } 339 if (rpcClient != null) { 340 rpcClient.close(); 341 } 342 } 343}