001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
022import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
023import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
024
025import java.io.IOException;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.commons.io.IOUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.MasterNotRunningException;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.RpcClient;
038import org.apache.hadoop.hbase.ipc.RpcClientFactory;
039import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
040import org.apache.hadoop.hbase.security.User;
041import org.apache.hadoop.hbase.util.CollectionUtils;
042import org.apache.hadoop.hbase.util.Threads;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
048import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
049
050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
054
055/**
056 * The implementation of AsyncConnection.
057 */
058@InterfaceAudience.Private
059class AsyncConnectionImpl implements AsyncConnection {
060
061  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
062
063  @VisibleForTesting
064  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
065    Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
066
067  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
068
069  private final Configuration conf;
070
071  final AsyncConnectionConfiguration connConf;
072
073  private final User user;
074
075  final AsyncRegistry registry;
076
077  private final int rpcTimeout;
078
079  private final RpcClient rpcClient;
080
081  final RpcControllerFactory rpcControllerFactory;
082
083  private final boolean hostnameCanChange;
084
085  private final AsyncRegionLocator locator;
086
087  final AsyncRpcRetryingCallerFactory callerFactory;
088
089  private final NonceGenerator nonceGenerator;
090
091  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
092  private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
093
094  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
095
096  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
097    new AtomicReference<>();
098
099  public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
100      User user) {
101    this.conf = conf;
102    this.user = user;
103    this.connConf = new AsyncConnectionConfiguration(conf);
104    this.registry = registry;
105    this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
106    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
107    this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
108    this.rpcTimeout =
109      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
110    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
111    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
112    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
113      nonceGenerator = PerClientRandomNonceGenerator.get();
114    } else {
115      nonceGenerator = NO_NONCE_GENERATOR;
116    }
117  }
118
119  @Override
120  public Configuration getConfiguration() {
121    return conf;
122  }
123
124  @Override
125  public void close() {
126    IOUtils.closeQuietly(rpcClient);
127    IOUtils.closeQuietly(registry);
128  }
129
130  @Override
131  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
132    return new AsyncTableRegionLocatorImpl(tableName, locator);
133  }
134
135  // we will override this method for testing retry caller, so do not remove this method.
136  AsyncRegionLocator getLocator() {
137    return locator;
138  }
139
140  // ditto
141  @VisibleForTesting
142  public NonceGenerator getNonceGenerator() {
143    return nonceGenerator;
144  }
145
146  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
147    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
148  }
149
150  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
151    return CollectionUtils.computeIfAbsentEx(rsStubs,
152      getStubKey(ClientService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
153      () -> createRegionServerStub(serverName));
154  }
155
156  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
157    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
158  }
159
160  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
161    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
162  }
163
164  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
165    return CollectionUtils.computeIfAbsentEx(adminSubs,
166      getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
167      () -> createAdminServerStub(serverName));
168  }
169
170  CompletableFuture<MasterService.Interface> getMasterStub() {
171    return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
172      CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
173      addListener(registry.getMasterAddress(), (addr, error) -> {
174        if (error != null) {
175          future.completeExceptionally(error);
176        } else if (addr == null) {
177          future.completeExceptionally(new MasterNotRunningException(
178            "ZooKeeper available but no active master location found"));
179        } else {
180          LOG.debug("The fetched master address is {}", addr);
181          try {
182            future.complete(createMasterStub(addr));
183          } catch (IOException e) {
184            future.completeExceptionally(e);
185          }
186        }
187
188      });
189      return future;
190    }, stub -> true, "master stub");
191  }
192
193  void clearMasterStubCache(MasterService.Interface stub) {
194    masterStub.compareAndSet(stub, null);
195  }
196
197  @Override
198  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
199    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
200
201      @Override
202      public AsyncTable<AdvancedScanResultConsumer> build() {
203        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
204      }
205    };
206  }
207
208  @Override
209  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
210      ExecutorService pool) {
211    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
212
213      @Override
214      public AsyncTable<ScanResultConsumer> build() {
215        RawAsyncTableImpl rawTable =
216          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
217        return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
218      }
219    };
220  }
221
222  @Override
223  public AsyncAdminBuilder getAdminBuilder() {
224    return new AsyncAdminBuilderBase(connConf) {
225      @Override
226      public AsyncAdmin build() {
227        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
228      }
229    };
230  }
231
232  @Override
233  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
234    return new AsyncAdminBuilderBase(connConf) {
235      @Override
236      public AsyncAdmin build() {
237        RawAsyncHBaseAdmin rawAdmin =
238          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
239        return new AsyncHBaseAdmin(rawAdmin, pool);
240      }
241    };
242  }
243
244  @Override
245  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
246    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
247  }
248
249  @Override
250  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
251      ExecutorService pool) {
252    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
253      RETRY_TIMER);
254  }
255
256  @Override
257  public CompletableFuture<Hbck> getHbck() {
258    CompletableFuture<Hbck> future = new CompletableFuture<>();
259    addListener(registry.getMasterAddress(), (sn, error) -> {
260      if (error != null) {
261        future.completeExceptionally(error);
262      } else {
263        try {
264          future.complete(getHbck(sn));
265        } catch (IOException e) {
266          future.completeExceptionally(e);
267        }
268      }
269    });
270    return future;
271  }
272
273  @Override
274  public Hbck getHbck(ServerName masterServer) throws IOException {
275    // we will not create a new connection when creating a new protobuf stub, and for hbck there
276    // will be no performance consideration, so for simplification we will create a new stub every
277    // time instead of caching the stub here.
278    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
279      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
280  }
281}