001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.net.UnknownHostException; 022import java.util.Set; 023import java.util.concurrent.CompletableFuture; 024import java.util.stream.Collectors; 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.yetus.audience.InterfaceAudience; 031 032import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 033 034import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; 038 039/** 040 * Rpc based connection registry. It will make use of the {@link ClientMetaService} to get registry 041 * information. 042 * <p/> 043 * It needs bootstrap node list when start up, and then it will use {@link ClientMetaService} to 044 * refresh the bootstrap node list periodically. 045 * <p/> 046 * Usually, you could set masters as the bootstrap nodes,as they will also implement the 047 * {@link ClientMetaService}, and then, we will switch to use region servers after refreshing the 048 * bootstrap nodes. 049 */ 050@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 051public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry { 052 053 /** Configuration key that controls the fan out of requests **/ 054 public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout"; 055 056 /** 057 * As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is 058 * possible that different end users will configure the same machine which makes the machine over 059 * load. So we should have a shorter delay for the initial refresh, to let users quickly switch to 060 * the bootstrap nodes we want them to connect to. 061 * <p/> 062 * The default value for initial refresh delay is 1/10 of periodic refresh interval. 063 */ 064 public static final String INITIAL_REFRESH_DELAY_SECS = 065 "hbase.client.bootstrap.initial_refresh_delay_secs"; 066 067 public static final String PERIODIC_REFRESH_INTERVAL_SECS = 068 "hbase.client.bootstrap.refresh_interval_secs"; 069 070 public static final String MIN_SECS_BETWEEN_REFRESHES = 071 "hbase.client.bootstrap.min_secs_between_refreshes"; 072 073 public static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers"; 074 075 private static final char ADDRS_CONF_SEPARATOR = ','; 076 077 private final String connectionString; 078 079 RpcConnectionRegistry(Configuration conf, User user) throws IOException { 080 super(conf, user, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, 081 PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES); 082 connectionString = buildConnectionString(conf); 083 } 084 085 private String buildConnectionString(Configuration conf) throws UnknownHostException { 086 final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES); 087 if (StringUtils.isBlank(configuredBootstrapNodes)) { 088 return MasterRegistry.getConnectionString(conf); 089 } 090 return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes) 091 .collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR))); 092 } 093 094 @Override 095 protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException { 096 // try get bootstrap nodes config first 097 String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES); 098 if (!StringUtils.isBlank(configuredBootstrapNodes)) { 099 return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes) 100 .map(addr -> ServerName.valueOf(addr, ServerName.NON_STARTCODE)) 101 .collect(Collectors.toSet()); 102 } else { 103 // otherwise, just use master addresses 104 return MasterRegistry.parseMasterAddrs(conf); 105 } 106 } 107 108 @Override 109 public String getConnectionString() { 110 return connectionString; 111 } 112 113 private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) { 114 return resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 115 .collect(Collectors.toSet()); 116 } 117 118 private CompletableFuture<Set<ServerName>> getBootstrapNodes() { 119 return this 120 .<GetBootstrapNodesResponse> call( 121 (c, s, d) -> s.getBootstrapNodes(c, GetBootstrapNodesRequest.getDefaultInstance(), d), 122 r -> r.getServerNameCount() != 0, "getBootstrapNodes()") 123 .thenApply(RpcConnectionRegistry::transformServerNames); 124 } 125 126 @Override 127 protected CompletableFuture<Set<ServerName>> fetchEndpoints() { 128 return getBootstrapNodes(); 129 } 130}