001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
021import static org.apache.hadoop.hbase.util.DNS.getHostname;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import java.io.IOException;
025import java.net.UnknownHostException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.ThreadLocalRandom;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.function.Predicate;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.HRegionLocation;
039import org.apache.hadoop.hbase.RegionLocations;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
042import org.apache.hadoop.hbase.ipc.HBaseRpcController;
043import org.apache.hadoop.hbase.ipc.RpcClient;
044import org.apache.hadoop.hbase.ipc.RpcClientFactory;
045import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hadoop.hbase.util.DNS.ServerType;
048import org.apache.yetus.audience.InterfaceAudience;
049
050import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
051import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
052import org.apache.hbase.thirdparty.com.google.common.base.Strings;
053import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
054import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
055import org.apache.hbase.thirdparty.com.google.protobuf.Message;
056import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
057
058import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
066
067/**
068 * Master based registry implementation. Makes RPCs to the configured master addresses from config
069 * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
070 * <p/>
071 * It supports hedged reads, set the fan out of the requests batch by
072 * {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
073 * it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
074 * <p/>
075 * TODO: Handle changes to the configuration dynamically without having to restart the client.
076 */
077@InterfaceAudience.Private
078public class MasterRegistry implements ConnectionRegistry {
079
080  /** Configuration key that controls the fan out of requests **/
081  public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
082    "hbase.client.master_registry.hedged.fanout";
083
084  /** Default value for the fan out of hedged requests. **/
085  public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
086
087  private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
088
089  private final int hedgedReadFanOut;
090
091  // Configured list of masters to probe the meta information from.
092  private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
093
094  // RPC client used to talk to the masters.
095  private final RpcClient rpcClient;
096  private final RpcControllerFactory rpcControllerFactory;
097
098  /**
099   * Parses the list of master addresses from the provided configuration. Supported format is comma
100   * separated host[:port] values. If no port number if specified, default master port is assumed.
101   * @param conf Configuration to parse from.
102   */
103  private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
104    Set<ServerName> masterAddrs = new HashSet<>();
105    String configuredMasters = getMasterAddr(conf);
106    for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
107      HostAndPort masterHostPort =
108        HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
109      masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
110    }
111    Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed");
112    return masterAddrs;
113  }
114
115  MasterRegistry(Configuration conf) throws IOException {
116    this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
117      MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
118    int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
119      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
120    // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
121    // this through the master registry...
122    // This is a problem as we will use the cluster id to determine the authentication method
123    rpcClient = RpcClientFactory.createClient(conf, null);
124    rpcControllerFactory = RpcControllerFactory.instantiate(conf);
125    Set<ServerName> masterAddrs = parseMasterAddrs(conf);
126    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
127      ImmutableMap.builderWithExpectedSize(masterAddrs.size());
128    User user = User.getCurrent();
129    for (ServerName masterAddr : masterAddrs) {
130      builder.put(masterAddr,
131        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
132    }
133    masterAddr2Stub = builder.build();
134  }
135
136  /**
137   * Builds the default master address end point if it is not specified in the configuration.
138   * <p/>
139   * Will be called in {@code HBaseTestingUtility}.
140   */
141  @VisibleForTesting
142  public static String getMasterAddr(Configuration conf) throws UnknownHostException {
143    String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY);
144    if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
145      return masterAddrFromConf;
146    }
147    String hostname = getHostname(conf, ServerType.MASTER);
148    int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
149    return String.format("%s:%d", hostname, port);
150  }
151
152  /**
153   * For describing the actual asynchronous rpc call.
154   * <p/>
155   * Typically, you can use lambda expression to implement this interface as
156   *
157   * <pre>
158   * (c, s, d) -> s.xxx(c, your request here, d)
159   * </pre>
160   */
161  @FunctionalInterface
162  private interface Callable<T> {
163    void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
164  }
165
166  private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
167    Callable<T> callable) {
168    HBaseRpcController controller = rpcControllerFactory.newController();
169    CompletableFuture<T> future = new CompletableFuture<>();
170    callable.call(controller, stub, resp -> {
171      if (controller.failed()) {
172        future.completeExceptionally(controller.getFailed());
173      } else {
174        future.complete(resp);
175      }
176    });
177    return future;
178  }
179
180  private IOException badResponse(String debug) {
181    return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
182  }
183
184  /**
185   * send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
186   * will complete the future and quit. If all the requests in one round are failed, we will start
187   * another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
188   * been tried and all of them are failed, we will fail the future.
189   */
190  private <T extends Message> void groupCall(CompletableFuture<T> future,
191    List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
192    Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
193    int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
194    AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
195    for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
196      addListener(call(masterStubs.get(i), callable), (r, e) -> {
197        // a simple check to skip all the later operations earlier
198        if (future.isDone()) {
199          return;
200        }
201        if (e == null && !isValidResp.test(r)) {
202          e = badResponse(debug);
203        }
204        if (e != null) {
205          // make sure when remaining reaches 0 we have all exceptions in the errors queue
206          errors.add(e);
207          if (remaining.decrementAndGet() == 0) {
208            if (endIndexExclusive == masterStubs.size()) {
209              // we are done, complete the future with exception
210              RetriesExhaustedException ex = new RetriesExhaustedException("masters",
211                masterStubs.size(), new ArrayList<>(errors));
212              future.completeExceptionally(
213                new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
214            } else {
215              groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
216                errors);
217            }
218          }
219        } else {
220          // do not need to decrement the counter any more as we have already finished the future.
221          future.complete(r);
222        }
223      });
224    }
225  }
226
227  private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
228    Predicate<T> isValidResp, String debug) {
229    List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
230    Collections.shuffle(masterStubs, ThreadLocalRandom.current());
231    CompletableFuture<T> future = new CompletableFuture<>();
232    groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
233    return future;
234  }
235
236  /**
237   * Simple helper to transform the result of getMetaRegionLocations() rpc.
238   */
239  private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
240    List<HRegionLocation> regionLocations = new ArrayList<>();
241    resp.getMetaLocationsList()
242      .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
243    return new RegionLocations(regionLocations);
244  }
245
246  @Override
247  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
248    return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
249      GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
250      "getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
251  }
252
253  @Override
254  public CompletableFuture<String> getClusterId() {
255    return this
256      .<GetClusterIdResponse> call(
257        (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
258        GetClusterIdResponse::hasClusterId, "getClusterId()")
259      .thenApply(GetClusterIdResponse::getClusterId);
260  }
261
262  private ServerName transformServerName(GetActiveMasterResponse resp) {
263    return ProtobufUtil.toServerName(resp.getServerName());
264  }
265
266  @Override
267  public CompletableFuture<ServerName> getActiveMaster() {
268    return this
269      .<GetActiveMasterResponse> call(
270        (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
271        GetActiveMasterResponse::hasServerName, "getActiveMaster()")
272      .thenApply(this::transformServerName);
273  }
274
275  @VisibleForTesting
276  Set<ServerName> getParsedMasterServers() {
277    return masterAddr2Stub.keySet();
278  }
279
280  @Override
281  public void close() {
282    if (rpcClient != null) {
283      rpcClient.close();
284    }
285  }
286}