001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Set;
025import java.util.concurrent.CompletableFuture;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.ipc.RpcClient;
030import org.apache.hadoop.hbase.ipc.RpcClientFactory;
031import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.util.FutureUtils;
034import org.apache.hadoop.hbase.util.IOExceptionSupplier;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
040import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
041
042import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
043
044/**
045 * A class for creating {@link RpcClient} and related stubs used by
046 * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap nodes to get the
047 * cluster id first, before creating the final {@link RpcClient} and related stubs.
048 * <p>
049 * See HBASE-25051 for more details.
050 */
051@InterfaceAudience.Private
052class ConnectionRegistryRpcStubHolder implements Closeable {
053
054  private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class);
055
056  private final Configuration conf;
057
058  // used for getting cluster id
059  private final Configuration noAuthConf;
060
061  private final User user;
062
063  private final RpcControllerFactory rpcControllerFactory;
064
065  private final Set<ServerName> bootstrapNodes;
066
067  private final int rpcTimeoutMs;
068
069  private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
070
071  private volatile RpcClient rpcClient;
072
073  private CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> addr2StubFuture;
074
075  ConnectionRegistryRpcStubHolder(Configuration conf, User user,
076    RpcControllerFactory rpcControllerFactory, Set<ServerName> bootstrapNodes) {
077    this.conf = conf;
078    if (User.isHBaseSecurityEnabled(conf)) {
079      this.noAuthConf = new Configuration(conf);
080      this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
081    } else {
082      this.noAuthConf = conf;
083    }
084    this.user = user;
085    this.rpcControllerFactory = rpcControllerFactory;
086    this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes);
087    this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
088      conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
089  }
090
091  private ImmutableMap<ServerName, ClientMetaService.Interface> createStubs(RpcClient rpcClient,
092    Collection<ServerName> addrs) {
093    LOG.debug("Going to use new servers to create stubs: {}", addrs);
094    Preconditions.checkNotNull(addrs);
095    ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
096      ImmutableMap.builderWithExpectedSize(addrs.size());
097    for (ServerName masterAddr : addrs) {
098      builder.put(masterAddr,
099        ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
100    }
101    return builder.build();
102  }
103
104  private CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>>
105    fetchClusterIdAndCreateStubs() {
106    CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> future =
107      new CompletableFuture<>();
108    addr2StubFuture = future;
109    FutureUtils.addListener(
110      new ClusterIdFetcher(noAuthConf, user, rpcControllerFactory, bootstrapNodes).fetchClusterId(),
111      (clusterId, error) -> {
112        synchronized (ConnectionRegistryRpcStubHolder.this) {
113          if (error != null) {
114            addr2StubFuture.completeExceptionally(error);
115          } else {
116            RpcClient c = RpcClientFactory.createClient(conf, clusterId);
117            ImmutableMap<ServerName, ClientMetaService.Interface> m =
118              createStubs(c, bootstrapNodes);
119            rpcClient = c;
120            addr2Stub = m;
121            addr2StubFuture.complete(m);
122          }
123          addr2StubFuture = null;
124        }
125      });
126    // here we must use the local variable future instead of addr2StubFuture, as the above listener
127    // could be executed directly in the same thread(if the future completes quick enough), since
128    // the synchronized lock is reentrant, it could set addr2StubFuture to null in the end, so when
129    // arriving here the addr2StubFuture could be null.
130    return future;
131  }
132
133  CompletableFuture<ImmutableMap<ServerName, ClientMetaService.Interface>> getStubs() {
134    ImmutableMap<ServerName, ClientMetaService.Interface> s = this.addr2Stub;
135    if (s != null) {
136      return CompletableFuture.completedFuture(s);
137    }
138    synchronized (this) {
139      s = this.addr2Stub;
140      if (s != null) {
141        return CompletableFuture.completedFuture(s);
142      }
143      if (addr2StubFuture != null) {
144        return addr2StubFuture;
145      }
146      return fetchClusterIdAndCreateStubs();
147    }
148  }
149
150  void refreshStubs(IOExceptionSupplier<Collection<ServerName>> fetchEndpoints) throws IOException {
151    // There is no actual call yet so we have not initialize the rpc client and related stubs yet,
152    // give up refreshing
153    if (addr2Stub == null) {
154      LOG.debug("Skip refreshing stubs as we have not initialized rpc client yet");
155      return;
156    }
157    LOG.debug("Going to refresh stubs");
158    assert rpcClient != null;
159    addr2Stub = createStubs(rpcClient, fetchEndpoints.get());
160  }
161
162  @Override
163  public void close() {
164    if (rpcClient != null) {
165      rpcClient.close();
166    }
167  }
168}