001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
021import static org.apache.hadoop.hbase.util.DNS.getHostname;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import java.io.IOException;
025import java.net.UnknownHostException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.HashSet;
029import java.util.List;
030import java.util.Set;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentLinkedQueue;
033import java.util.concurrent.ThreadLocalRandom;
034import java.util.concurrent.atomic.AtomicInteger;
035import java.util.function.Predicate;
036import java.util.stream.Collectors;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HRegionLocation;
040import org.apache.hadoop.hbase.RegionLocations;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
043import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
044import org.apache.hadoop.hbase.ipc.HBaseRpcController;
045import org.apache.hadoop.hbase.ipc.RpcClient;
046import org.apache.hadoop.hbase.ipc.RpcClientFactory;
047import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
048import org.apache.hadoop.hbase.security.User;
049import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
058import org.apache.hadoop.hbase.util.DNS.ServerType;
059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
060import org.apache.hbase.thirdparty.com.google.common.base.Strings;
061import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
062import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
063import org.apache.hbase.thirdparty.com.google.protobuf.Message;
064import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
065import org.apache.yetus.audience.InterfaceAudience;
066
067/**
068 * Master based registry implementation. Makes RPCs to the configured master addresses from config
069 * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
070 * <p/>
071 * It supports hedged reads, set the fan out of the requests batch by
072 * {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable
073 * it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}).
074 * <p/>
075 * TODO: Handle changes to the configuration dynamically without having to restart the client.
076 */
077@InterfaceAudience.Private
078public class MasterRegistry implements ConnectionRegistry {
079
080  /** Configuration key that controls the fan out of requests **/
081  public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
082    "hbase.client.master_registry.hedged.fanout";
083
084  /** Default value for the fan out of hedged requests. **/
085  public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2;
086
087  private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";
088
089  private final int hedgedReadFanOut;
090
091  // Configured list of masters to probe the meta information from.
092  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
093
094  // RPC client used to talk to the masters.
095  private final RpcClient rpcClient;
096  private final RpcControllerFactory rpcControllerFactory;
097  private final int rpcTimeoutMs;
098
099  protected final MasterAddressRefresher masterAddressRefresher;
100
101  /**
102   * Parses the list of master addresses from the provided configuration. Supported format is comma
103   * separated host[:port] values. If no port number if specified, default master port is assumed.
104   * @param conf Configuration to parse from.
105   */
106  private static Set<ServerName> parseMasterAddrs(Configuration conf) throws UnknownHostException {
107    Set<ServerName> masterAddrs = new HashSet<>();
108    String configuredMasters = getMasterAddr(conf);
109    for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
110      HostAndPort masterHostPort =
111        HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
112      masterAddrs.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
113    }
114    Preconditions.checkArgument(!masterAddrs.isEmpty(), "At least one master address is needed");
115    return masterAddrs;
116  }
117
118  MasterRegistry(Configuration conf) throws IOException {
119    this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
120      MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
121    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
122      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
123    // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
124    // this through the master registry...
125    // This is a problem as we will use the cluster id to determine the authentication method
126    rpcClient = RpcClientFactory.createClient(conf, null);
127    rpcControllerFactory = RpcControllerFactory.instantiate(conf);
128    // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
129    // by fetching the end points from this list.
130    populateMasterStubs(parseMasterAddrs(conf));
131    masterAddressRefresher = new MasterAddressRefresher(conf, this);
132  }
133
134  void populateMasterStubs(Set<ServerName> masters) throws IOException {
135    Preconditions.checkNotNull(masters);
136    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
137        ImmutableMap.builderWithExpectedSize(masters.size());
138    User user = User.getCurrent();
139    for (ServerName masterAddr : masters) {
140      builder.put(masterAddr,
141          ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
142    }
143    masterAddr2Stub = builder.build();
144  }
145
146  /**
147   * Builds the default master address end point if it is not specified in the configuration.
148   * <p/>
149   * Will be called in {@code HBaseTestingUtility}.
150   */
151  public static String getMasterAddr(Configuration conf) throws UnknownHostException {
152    String masterAddrFromConf = conf.get(MASTER_ADDRS_KEY);
153    if (!Strings.isNullOrEmpty(masterAddrFromConf)) {
154      return masterAddrFromConf;
155    }
156    String hostname = getHostname(conf, ServerType.MASTER);
157    int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
158    return String.format("%s:%d", hostname, port);
159  }
160
161  /**
162   * For describing the actual asynchronous rpc call.
163   * <p/>
164   * Typically, you can use lambda expression to implement this interface as
165   *
166   * <pre>
167   * (c, s, d) -> s.xxx(c, your request here, d)
168   * </pre>
169   */
170  @FunctionalInterface
171  private interface Callable<T> {
172    void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
173  }
174
175  private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
176    Callable<T> callable) {
177    HBaseRpcController controller = rpcControllerFactory.newController();
178    CompletableFuture<T> future = new CompletableFuture<>();
179    callable.call(controller, stub, resp -> {
180      if (controller.failed()) {
181        IOException failureReason = controller.getFailed();
182        future.completeExceptionally(failureReason);
183        if (ClientExceptionsUtil.isConnectionException(failureReason)) {
184          // RPC has failed, trigger a refresh of master end points. We can have some spurious
185          // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
186          masterAddressRefresher.refreshNow();
187        }
188      } else {
189        future.complete(resp);
190      }
191    });
192    return future;
193  }
194
195  private IOException badResponse(String debug) {
196    return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
197  }
198
199  /**
200   * send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we
201   * will complete the future and quit. If all the requests in one round are failed, we will start
202   * another round to send requests concurrently tohedgedReadsFanout masters. If all masters have
203   * been tried and all of them are failed, we will fail the future.
204   */
205  private <T extends Message> void groupCall(CompletableFuture<T> future,
206      Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
207      int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
208      ConcurrentLinkedQueue<Throwable> errors) {
209    int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
210    AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
211    for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
212      addListener(call(masterStubs.get(i), callable), (r, e) -> {
213        // a simple check to skip all the later operations earlier
214        if (future.isDone()) {
215          return;
216        }
217        if (e == null && !isValidResp.test(r)) {
218          e = badResponse(debug);
219        }
220        if (e != null) {
221          // make sure when remaining reaches 0 we have all exceptions in the errors queue
222          errors.add(e);
223          if (remaining.decrementAndGet() == 0) {
224            if (endIndexExclusive == masterStubs.size()) {
225              // we are done, complete the future with exception
226              RetriesExhaustedException ex = new RetriesExhaustedException("masters",
227                masterStubs.size(), new ArrayList<>(errors));
228              future.completeExceptionally(
229                new MasterRegistryFetchException(masterServers, ex));
230            } else {
231              groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
232                  isValidResp, debug, errors);
233            }
234          }
235        } else {
236          // do not need to decrement the counter any more as we have already finished the future.
237          future.complete(r);
238        }
239      });
240    }
241  }
242
243  private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
244    Predicate<T> isValidResp, String debug) {
245    ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub;
246    Set<ServerName> masterServers = masterAddr2StubRef.keySet();
247    List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values());
248    Collections.shuffle(masterStubs, ThreadLocalRandom.current());
249    CompletableFuture<T> future = new CompletableFuture<>();
250    groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
251        new ConcurrentLinkedQueue<>());
252    return future;
253  }
254
255  /**
256   * Simple helper to transform the result of getMetaRegionLocations() rpc.
257   */
258  private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
259    List<HRegionLocation> regionLocations = new ArrayList<>();
260    resp.getMetaLocationsList()
261      .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
262    return new RegionLocations(regionLocations);
263  }
264
265  @Override
266  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
267    return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
268      GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
269      "getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
270  }
271
272  @Override
273  public CompletableFuture<String> getClusterId() {
274    return this
275      .<GetClusterIdResponse> call(
276        (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
277        GetClusterIdResponse::hasClusterId, "getClusterId()")
278      .thenApply(GetClusterIdResponse::getClusterId);
279  }
280
281  private static boolean hasActiveMaster(GetMastersResponse resp) {
282    List<GetMastersResponseEntry> activeMasters =
283        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
284        Collectors.toList());
285    return activeMasters.size() == 1;
286  }
287
288  private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException {
289    List<GetMastersResponseEntry> activeMasters =
290        resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
291            Collectors.toList());
292    if (activeMasters.size() != 1) {
293      throw new IOException(String.format("Incorrect number of active masters encountered." +
294          " Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters));
295    }
296    return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
297  }
298
299  @Override
300  public CompletableFuture<ServerName> getActiveMaster() {
301    CompletableFuture<ServerName> future = new CompletableFuture<>();
302    addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
303      MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> {
304        if (ex != null) {
305          future.completeExceptionally(ex);
306        }
307        ServerName result = null;
308        try {
309          result = filterActiveMaster((GetMastersResponse)resp);
310        } catch (IOException e) {
311          future.completeExceptionally(e);
312        }
313        future.complete(result);
314      });
315    return future;
316  }
317
318  private static List<ServerName> transformServerNames(GetMastersResponse resp) {
319    return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
320        s.getServerName())).collect(Collectors.toList());
321  }
322
323  CompletableFuture<List<ServerName>> getMasters() {
324    return this
325        .<GetMastersResponse> call((c, s, d) -> s.getMasters(
326            c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
327            "getMasters()").thenApply(MasterRegistry::transformServerNames);
328  }
329
330  Set<ServerName> getParsedMasterServers() {
331    return masterAddr2Stub.keySet();
332  }
333
334  @Override
335  public void close() {
336    if (masterAddressRefresher != null) {
337      masterAddressRefresher.close();
338    }
339    if (rpcClient != null) {
340      rpcClient.close();
341    }
342  }
343}