001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
021import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
022import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
023
024import com.google.errorprone.annotations.RestrictedApi;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Collections;
028import java.util.List;
029import java.util.Set;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.ConcurrentLinkedQueue;
032import java.util.concurrent.ThreadLocalRandom;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.function.Predicate;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
040import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
041import org.apache.hadoop.hbase.ipc.HBaseRpcController;
042import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
043import org.apache.hadoop.hbase.security.User;
044import org.apache.hadoop.hbase.util.FutureUtils;
045import org.apache.yetus.audience.InterfaceAudience;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.Message;
048import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
049
050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
058
059/**
060 * Base class for rpc based connection registry implementation.
061 * <p/>
062 * The implementation needs a bootstrap node list in configuration, and then it will use the methods
063 * in {@link ClientMetaService} to refresh the connection registry end points.
064 * <p/>
065 * It also supports hedged reads, the default fan out value is 2.
066 * <p/>
067 * For the actual configuration names, see javadoc of sub classes.
068 */
069@InterfaceAudience.Private
070abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry {
071
072  /** Default value for the fan out of hedged requests. **/
073  public static final int HEDGED_REQS_FANOUT_DEFAULT = 2;
074
075  private final int hedgedReadFanOut;
076
077  // RPC client used to talk to the masters.
078  private final ConnectionRegistryRpcStubHolder rpcStubHolder;
079  private final RpcControllerFactory rpcControllerFactory;
080
081  private final RegistryEndpointsRefresher registryEndpointRefresher;
082
083  protected AbstractRpcBasedConnectionRegistry(Configuration conf, User user,
084    String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
085    String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
086    throws IOException {
087    this.hedgedReadFanOut =
088      Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
089    rpcControllerFactory = RpcControllerFactory.instantiate(conf);
090    rpcStubHolder = new ConnectionRegistryRpcStubHolder(conf, user, rpcControllerFactory,
091      getBootstrapNodes(conf));
092    // could return null here is refresh interval is less than zero
093    registryEndpointRefresher =
094      RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
095        refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
096  }
097
098  protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
099
100  protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints();
101
102  private void refreshStubs() throws IOException {
103    rpcStubHolder.refreshStubs(() -> FutureUtils.get(fetchEndpoints()));
104  }
105
106  /**
107   * For describing the actual asynchronous rpc call.
108   * <p/>
109   * Typically, you can use lambda expression to implement this interface as
110   *
111   * <pre>
112   * (c, s, d) -&gt; s.xxx(c, your request here, d)
113   * </pre>
114   */
115  @FunctionalInterface
116  protected interface Callable<T> {
117    void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
118  }
119
120  private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
121    Callable<T> callable) {
122    HBaseRpcController controller = rpcControllerFactory.newController();
123    CompletableFuture<T> future = new CompletableFuture<>();
124    callable.call(controller, stub, resp -> {
125      if (controller.failed()) {
126        IOException failureReason = controller.getFailed();
127        future.completeExceptionally(failureReason);
128        if (ClientExceptionsUtil.isConnectionException(failureReason)) {
129          // RPC has failed, trigger a refresh of end points. We can have some spurious
130          // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
131          registryEndpointRefresher.refreshNow();
132        }
133      } else {
134        future.complete(resp);
135      }
136    });
137    return future;
138  }
139
140  private IOException badResponse(String debug) {
141    return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
142  }
143
144  /**
145   * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded,
146   * we will complete the future and quit. If all the requests in one round are failed, we will
147   * start another round to send requests concurrently tohedgedReadsFanout end points. If all end
148   * points have been tried and all of them are failed, we will fail the future.
149   */
150  private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers,
151    List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable,
152    Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
153    int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size());
154    AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
155    for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
156      addListener(call(stubs.get(i), callable), (r, e) -> {
157        // a simple check to skip all the later operations earlier
158        if (future.isDone()) {
159          return;
160        }
161        if (e == null && !isValidResp.test(r)) {
162          e = badResponse(debug);
163        }
164        if (e != null) {
165          // make sure when remaining reaches 0 we have all exceptions in the errors queue
166          errors.add(e);
167          if (remaining.decrementAndGet() == 0) {
168            if (endIndexExclusive == stubs.size()) {
169              // we are done, complete the future with exception
170              RetriesExhaustedException ex =
171                new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors));
172              future.completeExceptionally(new MasterRegistryFetchException(servers, ex));
173            } else {
174              groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug,
175                errors);
176            }
177          }
178        } else {
179          // do not need to decrement the counter any more as we have already finished the future.
180          future.complete(r);
181        }
182      });
183    }
184  }
185
186  protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable,
187    Predicate<T> isValidResp, String debug) {
188    CompletableFuture<T> future = new CompletableFuture<>();
189    FutureUtils.addListener(rpcStubHolder.getStubs(), (addr2Stub, error) -> {
190      if (error != null) {
191        future.completeExceptionally(error);
192        return;
193      }
194      Set<ServerName> servers = addr2Stub.keySet();
195      List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2Stub.values());
196      Collections.shuffle(stubs, ThreadLocalRandom.current());
197      groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
198        new ConcurrentLinkedQueue<>());
199    });
200    return future;
201  }
202
203  @RestrictedApi(explanation = "Should only be called in tests", link = "",
204      allowedOnPath = ".*/src/test/.*")
205  Set<ServerName> getParsedServers() throws IOException {
206    return FutureUtils.get(rpcStubHolder.getStubs()).keySet();
207  }
208
209  /**
210   * Simple helper to transform the result of getMetaRegionLocations() rpc.
211   */
212  private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
213    List<HRegionLocation> regionLocations = new ArrayList<>();
214    resp.getMetaLocationsList()
215      .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
216    return new RegionLocations(regionLocations);
217  }
218
219  @Override
220  public CompletableFuture<RegionLocations> getMetaRegionLocations() {
221    return tracedFuture(
222      () -> this
223        .<GetMetaRegionLocationsResponse> call(
224          (c, s, d) -> s.getMetaRegionLocations(c,
225            GetMetaRegionLocationsRequest.getDefaultInstance(), d),
226          r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
227        .thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations),
228      getClass().getSimpleName() + ".getMetaRegionLocations");
229  }
230
231  @Override
232  public CompletableFuture<String> getClusterId() {
233    return tracedFuture(
234      () -> this
235        .<GetClusterIdResponse> call(
236          (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
237          GetClusterIdResponse::hasClusterId, "getClusterId()")
238        .thenApply(GetClusterIdResponse::getClusterId),
239      getClass().getSimpleName() + ".getClusterId");
240  }
241
242  @Override
243  public CompletableFuture<ServerName> getActiveMaster() {
244    return tracedFuture(
245      () -> this
246        .<GetActiveMasterResponse> call(
247          (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
248          GetActiveMasterResponse::hasServerName, "getActiveMaster()")
249        .thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName())),
250      getClass().getSimpleName() + ".getActiveMaster");
251  }
252
253  @Override
254  public void close() {
255    trace(() -> {
256      if (registryEndpointRefresher != null) {
257        registryEndpointRefresher.stop();
258      }
259      if (rpcStubHolder != null) {
260        rpcStubHolder.close();
261      }
262    }, getClass().getSimpleName() + ".close");
263  }
264}