001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
021import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import com.google.errorprone.annotations.RestrictedApi;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.ConcurrentLinkedQueue;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.function.Predicate;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.RegionLocations;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
041import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
042import org.apache.hadoop.hbase.ipc.HBaseRpcController;
043import org.apache.hadoop.hbase.ipc.RpcClient;
044import org.apache.hadoop.hbase.ipc.RpcClientFactory;
045import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hadoop.hbase.util.FutureUtils;
048import org.apache.yetus.audience.InterfaceAudience;
049
050import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
051import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
052import org.apache.hbase.thirdparty.com.google.protobuf.Message;
053import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
054
055import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
063
064/**
065 * Base class for rpc based connection registry implementation.
066 * <p/>
067 * The implementation needs a bootstrap node list in configuration, and then it will use the methods
068 * in {@link ClientMetaService} to refresh the connection registry end points.
069 * <p/>
070 * It also supports hedged reads, the default fan out value is 2.
071 * <p/>
072 * For the actual configuration names, see javadoc of sub classes.
073 */
074@InterfaceAudience.Private
075abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry {
076
077  /** Default value for the fan out of hedged requests. **/
078  public static final int HEDGED_REQS_FANOUT_DEFAULT = 2;
079
080  private final int hedgedReadFanOut;
081
082  // Configured list of end points to probe the meta information from.
083  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
084
085  // RPC client used to talk to the masters.
086  private final RpcClient rpcClient;
087  private final RpcControllerFactory rpcControllerFactory;
088  private final int rpcTimeoutMs;
089
090  private final RegistryEndpointsRefresher registryEndpointRefresher;
091
092  protected AbstractRpcBasedConnectionRegistry(Configuration conf,
093    String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
094    String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
095    throws IOException {
096    this.hedgedReadFanOut =
097      Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
098    rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
099      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
100    // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
101    // this through the master registry...
102    // This is a problem as we will use the cluster id to determine the authentication method
103    rpcClient = RpcClientFactory.createClient(conf, null);
104    rpcControllerFactory = RpcControllerFactory.instantiate(conf);
105    populateStubs(getBootstrapNodes(conf));
106    // could return null here is refresh interval is less than zero
107    registryEndpointRefresher =
108      RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
109        refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
110  }
111
112  protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
113
114  protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints();
115
116  private void refreshStubs() throws IOException {
117    populateStubs(FutureUtils.get(fetchEndpoints()));
118  }
119
120  private void populateStubs(Set<ServerName> addrs) throws IOException {
121    Preconditions.checkNotNull(addrs);
122    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
123      ImmutableMap.builderWithExpectedSize(addrs.size());
124    User user = User.getCurrent();
125    for (ServerName masterAddr : addrs) {
126      builder.put(masterAddr,
127        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
128    }
129    addr2Stub = builder.build();
130  }
131
132  /**
133   * For describing the actual asynchronous rpc call.
134   * <p/>
135   * Typically, you can use lambda expression to implement this interface as
136   *
137   * <pre>
138   * (c, s, d) -> s.xxx(c, your request here, d)
139   * </pre>
140   */
141  @FunctionalInterface
142  protected interface Callable<T> {
143    void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
144  }
145
146  private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
147    Callable<T> callable) {
148    HBaseRpcController controller = rpcControllerFactory.newController();
149    CompletableFuture<T> future = new CompletableFuture<>();
150    callable.call(controller, stub, resp -> {
151      if (controller.failed()) {
152        IOException failureReason = controller.getFailed();
153        future.completeExceptionally(failureReason);
154        if (ClientExceptionsUtil.isConnectionException(failureReason)) {
155          // RPC has failed, trigger a refresh of end points. We can have some spurious
156          // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
157          registryEndpointRefresher.refreshNow();
158        }
159      } else {
160        future.complete(resp);
161      }
162    });
163    return future;
164  }
165
166  private IOException badResponse(String debug) {
167    return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
168  }
169
170  /**
171   * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded,
172   * we will complete the future and quit. If all the requests in one round are failed, we will
173   * start another round to send requests concurrently tohedgedReadsFanout end points. If all end
174   * points have been tried and all of them are failed, we will fail the future.
175   */
176  private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers,
177    List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable,
178    Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
179    int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size());
180    AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
181    for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
182      addListener(call(stubs.get(i), callable), (r, e) -> {
183        // a simple check to skip all the later operations earlier
184        if (future.isDone()) {
185          return;
186        }
187        if (e == null && !isValidResp.test(r)) {
188          e = badResponse(debug);
189        }
190        if (e != null) {
191          // make sure when remaining reaches 0 we have all exceptions in the errors queue
192          errors.add(e);
193          if (remaining.decrementAndGet() == 0) {
194            if (endIndexExclusive == stubs.size()) {
195              // we are done, complete the future with exception
196              RetriesExhaustedException ex =
197                new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors));
198              future.completeExceptionally(new MasterRegistryFetchException(servers, ex));
199            } else {
200              groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug,
201                errors);
202            }
203          }
204        } else {
205          // do not need to decrement the counter any more as we have already finished the future.
206          future.complete(r);
207        }
208      });
209    }
210  }
211
212  protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable,
213    Predicate<T> isValidResp, String debug) {
214    ImmutableMap<ServerName, ClientMetaService.Interface> addr2StubRef = addr2Stub;
215    Set<ServerName> servers = addr2StubRef.keySet();
216    List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2StubRef.values());
217    Collections.shuffle(stubs, ThreadLocalRandom.current());
218    CompletableFuture<T> future = new CompletableFuture<>();
219    groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
220      new ConcurrentLinkedQueue<>());
221    return future;
222  }
223
224  @RestrictedApi(explanation = "Should only be called in tests", link = "",
225      allowedOnPath = ".*/src/test/.*")
226  Set<ServerName> getParsedServers() {
227    return addr2Stub.keySet();
228  }
229
230  /**
231   * Simple helper to transform the result of getMetaRegionLocations() rpc.
232   */
233  private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
234    List<HRegionLocation> regionLocations = new ArrayList<>();
235    resp.getMetaLocationsList()
236      .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
237    return new RegionLocations(regionLocations);
238  }
239
240  @Override
241  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
242    return tracedFuture(
243      () -> this
244        .<GetMetaRegionLocationsResponse> call(
245          (c, s, d) -> s.getMetaRegionLocations(c,
246            GetMetaRegionLocationsRequest.getDefaultInstance(), d),
247          r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
248        .thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations),
249      getClass().getSimpleName() + ".getMetaRegionLocations");
250  }
251
252  @Override
253  public CompletableFuture<String> getClusterId() {
254    return tracedFuture(
255      () -> this
256        .<GetClusterIdResponse> call(
257          (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
258          GetClusterIdResponse::hasClusterId, "getClusterId()")
259        .thenApply(GetClusterIdResponse::getClusterId),
260      getClass().getSimpleName() + ".getClusterId");
261  }
262
263  @Override
264  public CompletableFuture<ServerName> getActiveMaster() {
265    return tracedFuture(
266      () -> this
267        .<GetActiveMasterResponse> call(
268          (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
269          GetActiveMasterResponse::hasServerName, "getActiveMaster()")
270        .thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName())),
271      getClass().getSimpleName() + ".getClusterId");
272  }
273
274  @Override
275  public void close() {
276    trace(() -> {
277      if (registryEndpointRefresher != null) {
278        registryEndpointRefresher.stop();
279      }
280      if (rpcClient != null) {
281        rpcClient.close();
282      }
283    }, getClass().getSimpleName() + ".close");
284  }
285}