001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import java.util.Set;
024import java.util.concurrent.CompletableFuture;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.ipc.HBaseRpcController;
028import org.apache.hadoop.hbase.ipc.RpcClient;
029import org.apache.hadoop.hbase.ipc.RpcClientFactory;
030import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
031import org.apache.hadoop.hbase.security.User;
032import org.apache.hadoop.hbase.util.FutureUtils;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
038import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;
043
044/**
045 * Fetch cluster id through special preamble header.
046 * <p>
047 * An instance of this class should only be used once, like:
048 *
049 * <pre>
050 * new ClusterIdFetcher().fetchClusterId()
051 * </pre>
052 *
053 * Calling the fetchClusterId multiple times will lead unexpected behavior.
054 * <p>
055 * See HBASE-25051 for more details.
056 */
057@InterfaceAudience.Private
058class ClusterIdFetcher {
059
060  private static final Logger LOG = LoggerFactory.getLogger(ClusterIdFetcher.class);
061
062  private final List<ServerName> bootstrapServers;
063
064  private final User user;
065
066  private final RpcClient rpcClient;
067
068  private final RpcControllerFactory rpcControllerFactory;
069
070  private final CompletableFuture<String> future;
071
072  ClusterIdFetcher(Configuration conf, User user, RpcControllerFactory rpcControllerFactory,
073    Set<ServerName> bootstrapServers) {
074    this.user = user;
075    // use null cluster id here as we do not know the cluster id yet, we will fetch it through this
076    // rpc client
077    this.rpcClient = RpcClientFactory.createClient(conf, null);
078    this.rpcControllerFactory = rpcControllerFactory;
079    this.bootstrapServers = new ArrayList<ServerName>(bootstrapServers);
080    // shuffle the bootstrap servers so we will not always fetch from the same one
081    Collections.shuffle(this.bootstrapServers);
082    future = new CompletableFuture<String>();
083  }
084
085  /**
086   * Try get cluster id from the server with the given {@code index} in {@link #bootstrapServers}.
087   */
088  private void getClusterId(int index) {
089    ServerName server = bootstrapServers.get(index);
090    LOG.debug("Going to request {} for getting cluster id", server);
091    // user and rpcTimeout are both not important here, as we will not actually send any rpc calls
092    // out, only a preamble connection header, but if we pass null as user, there will be NPE in
093    // some code paths...
094    RpcChannel channel = rpcClient.createRpcChannel(server, user, 0);
095    ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel);
096    HBaseRpcController controller = rpcControllerFactory.newController();
097    stub.getConnectionRegistry(controller, GetConnectionRegistryRequest.getDefaultInstance(),
098      new RpcCallback<GetConnectionRegistryResponse>() {
099
100        @Override
101        public void run(GetConnectionRegistryResponse resp) {
102          if (!controller.failed()) {
103            LOG.debug("Got connection registry info: {}", resp);
104            future.complete(resp.getClusterId());
105            return;
106          }
107          if (ConnectionUtils.isUnexpectedPreambleHeaderException(controller.getFailed())) {
108            // this means we have connected to an old server where it does not support passing
109            // cluster id through preamble connnection header, so we fallback to use null
110            // cluster id, which is the old behavior
111            LOG.debug("Failed to get connection registry info, should be an old server,"
112              + " fallback to use null cluster id", controller.getFailed());
113            future.complete(null);
114          } else {
115            LOG.debug("Failed to get connection registry info", controller.getFailed());
116            if (index == bootstrapServers.size() - 1) {
117              future.completeExceptionally(controller.getFailed());
118            } else {
119              // try next bootstrap server
120              getClusterId(index + 1);
121            }
122          }
123        }
124      });
125
126  }
127
128  CompletableFuture<String> fetchClusterId() {
129    getClusterId(0);
130    // close the rpc client after we finish the request
131    FutureUtils.addListener(future, (r, e) -> rpcClient.close());
132    return future;
133  }
134}