001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.net.UnknownHostException; 022import java.util.Set; 023import java.util.concurrent.CompletableFuture; 024import java.util.stream.Collectors; 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.yetus.audience.InterfaceAudience; 030 031import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 032 033import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; 037 038/** 039 * Rpc based connection registry. It will make use of the {@link ClientMetaService} to get registry 040 * information. 041 * <p/> 042 * It needs bootstrap node list when start up, and then it will use {@link ClientMetaService} to 043 * refresh the bootstrap node list periodically. 044 * <p/> 045 * Usually, you could set masters as the bootstrap nodes,as they will also implement the 046 * {@link ClientMetaService}, and then, we will switch to use region servers after refreshing the 047 * bootstrap nodes. 048 */ 049@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 050public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry { 051 052 /** Configuration key that controls the fan out of requests **/ 053 public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout"; 054 055 /** 056 * As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is 057 * possible that different end users will configure the same machine which makes the machine over 058 * load. So we should have a shorter delay for the initial refresh, to let users quickly switch to 059 * the bootstrap nodes we want them to connect to. 060 * <p/> 061 * The default value for initial refresh delay is 1/10 of periodic refresh interval. 062 */ 063 public static final String INITIAL_REFRESH_DELAY_SECS = 064 "hbase.client.bootstrap.initial_refresh_delay_secs"; 065 066 public static final String PERIODIC_REFRESH_INTERVAL_SECS = 067 "hbase.client.bootstrap.refresh_interval_secs"; 068 069 public static final String MIN_SECS_BETWEEN_REFRESHES = 070 "hbase.client.bootstrap.min_secs_between_refreshes"; 071 072 public static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers"; 073 074 private static final char ADDRS_CONF_SEPARATOR = ','; 075 076 private final String connectionString; 077 078 RpcConnectionRegistry(Configuration conf) throws IOException { 079 super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS, 080 MIN_SECS_BETWEEN_REFRESHES); 081 connectionString = buildConnectionString(conf); 082 } 083 084 private String buildConnectionString(Configuration conf) throws UnknownHostException { 085 final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES); 086 if (StringUtils.isBlank(configuredBootstrapNodes)) { 087 return MasterRegistry.getConnectionString(conf); 088 } 089 return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes) 090 .collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR))); 091 } 092 093 @Override 094 protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException { 095 // try get bootstrap nodes config first 096 String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES); 097 if (!StringUtils.isBlank(configuredBootstrapNodes)) { 098 return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes) 099 .map(addr -> ServerName.valueOf(addr, ServerName.NON_STARTCODE)) 100 .collect(Collectors.toSet()); 101 } else { 102 // otherwise, just use master addresses 103 return MasterRegistry.parseMasterAddrs(conf); 104 } 105 } 106 107 @Override 108 public String getConnectionString() { 109 return connectionString; 110 } 111 112 private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) { 113 return resp.getServerNameList().stream().map(ProtobufUtil::toServerName) 114 .collect(Collectors.toSet()); 115 } 116 117 private CompletableFuture<Set<ServerName>> getBootstrapNodes() { 118 return this 119 .<GetBootstrapNodesResponse> call( 120 (c, s, d) -> s.getBootstrapNodes(c, GetBootstrapNodesRequest.getDefaultInstance(), d), 121 r -> r.getServerNameCount() != 0, "getBootstrapNodes()") 122 .thenApply(RpcConnectionRegistry::transformServerNames); 123 } 124 125 @Override 126 protected CompletableFuture<Set<ServerName>> fetchEndpoints() { 127 return getBootstrapNodes(); 128 } 129}