001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
021import static org.apache.hadoop.hbase.util.DNS.getHostname;
022
023import com.google.errorprone.annotations.RestrictedApi;
024import java.io.IOException;
025import java.net.UnknownHostException;
026import java.util.HashSet;
027import java.util.Set;
028import java.util.concurrent.CompletableFuture;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.util.DNS.ServerType;
034import org.apache.yetus.audience.InterfaceAudience;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
038import org.apache.hbase.thirdparty.com.google.common.base.Strings;
039import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
040
041import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse;
044
045/**
046 * Master based registry implementation. Makes RPCs to the configured master addresses from config
047 * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
048 * <p/>
049 * It supports hedged reads, set the fan out of the requests batch by
050 * {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
051 * it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}).
052 * <p/>
053 * @deprecated Since 2.5.0, will be removed in 4.0.0. Use {@link RpcConnectionRegistry} instead.
054 */
055@Deprecated
056@InterfaceAudience.Private
057public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
058
059  /** Configuration key that controls the fan out of requests **/
060  public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
061    "hbase.client.master_registry.hedged.fanout";
062
063  public static final String MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS =
064    "hbase.client.master_registry.initial_refresh_delay_secs";
065
066  public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS =
067    "hbase.client.master_registry.refresh_interval_secs";
068
069  public static final String MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES =
070    "hbase.client.master_registry.min_secs_between_refreshes";
071
072  private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
073
074  /**
075   * Parses the list of master addresses from the provided configuration. Supported format is comma
076   * separated host[:port] values. If no port number if specified, default master port is assumed.
077   * @param conf Configuration to parse from.
078   */
079  public static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
080    Set<ServerName> masterAddrs = new HashSet<>();
081    String configuredMasters = getMasterAddr(conf);
082    for (String masterAddr : Splitter.onPattern(MASTER_ADDRS_CONF_SEPARATOR)
083      .split(configuredMasters)) {
084      HostAndPort masterHostPort =
085        HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
086      masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
087    }
088    Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed");
089    return masterAddrs;
090  }
091
092  private final String connectionString;
093
094  MasterRegistry(Configuration conf) throws IOException {
095    super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
096      MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
097    connectionString = getConnectionString(conf);
098  }
099
100  @Override
101  protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
102    return parseMasterAddrs(conf);
103  }
104
105  @Override
106  protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
107    return getMasters();
108  }
109
110  @Override
111  public String getConnectionString() {
112    return connectionString;
113  }
114
115  static String getConnectionString(Configuration conf) throws UnknownHostException {
116    return getMasterAddr(conf);
117  }
118
119  /**
120   * Builds the default master address end point if it is not specified in the configuration.
121   * <p/>
122   * Will be called in {@code HBaseTestingUtility}.
123   */
124  public static String getMasterAddr(Configuration conf) throws UnknownHostException {
125    String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY);
126    if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
127      return masterAddrFromConf;
128    }
129    String hostname = getHostname(conf, ServerType.MASTER);
130    int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
131    return String.format("%s:%d", hostname, port);
132  }
133
134  private static Set<ServerName> transformServerNames(GetMastersResponse resp) {
135    return resp.getMasterServersList().stream()
136      .map(s -> ProtobufUtil.toServerName(s.getServerName())).collect(Collectors.toSet());
137  }
138
139  @RestrictedApi(explanation = "Should only be called in tests", link = "",
140      allowedOnPath = ".*/(.*/MasterRegistry.java|src/test/.*)")
141  CompletableFuture<Set<ServerName>> getMasters() {
142    return this
143      .<GetMastersResponse> call(
144        (c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
145        r -> r.getMasterServersCount() != 0, "getMasters()")
146      .thenApply(MasterRegistry::transformServerNames);
147  }
148}