001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
028import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
029import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
030
031import io.opentelemetry.api.trace.Span;
032import java.io.IOException;
033import java.util.Optional;
034import java.util.concurrent.CompletableFuture;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.ExecutorService;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicReference;
041import org.apache.commons.io.IOUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.AuthUtil;
044import org.apache.hadoop.hbase.ChoreService;
045import org.apache.hadoop.hbase.MasterNotRunningException;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
049import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
050import org.apache.hadoop.hbase.ipc.RpcClient;
051import org.apache.hadoop.hbase.ipc.RpcClientFactory;
052import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.trace.TraceUtil;
055import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
056import org.apache.hadoop.hbase.util.Threads;
057import org.apache.hadoop.security.UserGroupInformation;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
063import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
069
070/**
071 * The implementation of AsyncConnection.
072 */
073@InterfaceAudience.Private
074public class AsyncConnectionImpl implements AsyncConnection {
075
076  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
077
078  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
079    new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
080      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
081    10, TimeUnit.MILLISECONDS);
082
083  private final Configuration conf;
084
085  final AsyncConnectionConfiguration connConf;
086
087  private final User user;
088
089  final ConnectionRegistry registry;
090
091  private final int rpcTimeout;
092
093  private final RpcClient rpcClient;
094
095  final RpcControllerFactory rpcControllerFactory;
096
097  private final AsyncRegionLocator locator;
098
099  final AsyncRpcRetryingCallerFactory callerFactory;
100
101  private final NonceGenerator nonceGenerator;
102
103  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
104  private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
105
106  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
107
108  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
109    new AtomicReference<>();
110
111  private final Optional<ServerStatisticTracker> stats;
112  private final ClientBackoffPolicy backoffPolicy;
113
114  private ChoreService choreService;
115
116  private volatile boolean closed = false;
117
118  private final Optional<MetricsConnection> metrics;
119
120  private final ClusterStatusListener clusterStatusListener;
121
122  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
123    User user) {
124    this.conf = conf;
125    this.user = user;
126
127    if (user.isLoginFromKeytab()) {
128      spawnRenewalChore(user.getUGI());
129    }
130    this.connConf = new AsyncConnectionConfiguration(conf);
131    this.registry = registry;
132    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
133      String scope = MetricsConnection.getScope(conf, clusterId, this);
134      this.metrics = Optional.of(new MetricsConnection(scope, () -> null, () -> null));
135    } else {
136      this.metrics = Optional.empty();
137    }
138    this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null));
139    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
140    this.rpcTimeout =
141      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
142    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
143    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
144    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
145      nonceGenerator = PerClientRandomNonceGenerator.get();
146    } else {
147      nonceGenerator = NO_NONCE_GENERATOR;
148    }
149    this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
150    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
151    ClusterStatusListener listener = null;
152    if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
153      // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
154      // it in, just like clusterId. Not a big problem for now as the default value is false.
155      Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
156        STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
157      if (listenerClass == null) {
158        LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
159      } else {
160        try {
161          listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
162            @Override
163            public void newDead(ServerName sn) {
164              locator.clearCache(sn);
165              rpcClient.cancelConnections(sn);
166            }
167          }, conf, listenerClass);
168        } catch (IOException e) {
169          LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
170        }
171      }
172    }
173    this.clusterStatusListener = listener;
174  }
175
176  private void spawnRenewalChore(final UserGroupInformation user) {
177    ChoreService service = getChoreService();
178    service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf));
179  }
180
181  /**
182   * If choreService has not been created yet, create the ChoreService. n
183   */
184  synchronized ChoreService getChoreService() {
185    if (isClosed()) {
186      throw new IllegalStateException("connection is already closed");
187    }
188    if (choreService == null) {
189      choreService = new ChoreService("AsyncConn Chore Service");
190    }
191    return choreService;
192  }
193
194  public User getUser() {
195    return user;
196  }
197
198  public ConnectionRegistry getConnectionRegistry() {
199    return registry;
200  }
201
202  @Override
203  public Configuration getConfiguration() {
204    return conf;
205  }
206
207  @Override
208  public void close() {
209    TraceUtil.trace(() -> {
210      // As the code below is safe to be executed in parallel, here we do not use CAS or lock,
211      // just a simple volatile flag.
212      if (closed) {
213        return;
214      }
215      LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
216      if (LOG.isDebugEnabled()) {
217        logCallStack(Thread.currentThread().getStackTrace());
218      }
219      IOUtils.closeQuietly(clusterStatusListener,
220        e -> LOG.warn("failed to close clusterStatusListener", e));
221      IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
222      IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
223      synchronized (this) {
224        if (choreService != null) {
225          choreService.shutdown();
226          choreService = null;
227        }
228      }
229      metrics.ifPresent(MetricsConnection::shutdown);
230      closed = true;
231    }, "AsyncConnection.close");
232  }
233
234  private void logCallStack(StackTraceElement[] stackTraceElements) {
235    StringBuilder stackBuilder = new StringBuilder("Call stack:");
236    for (StackTraceElement element : stackTraceElements) {
237      stackBuilder.append("\n    at ");
238      stackBuilder.append(element);
239    }
240    stackBuilder.append("\n");
241    LOG.debug(stackBuilder.toString());
242  }
243
244  @Override
245  public boolean isClosed() {
246    return closed;
247  }
248
249  @Override
250  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
251    return new AsyncTableRegionLocatorImpl(tableName, this);
252  }
253
254  // we will override this method for testing retry caller, so do not remove this method.
255  AsyncRegionLocator getLocator() {
256    return locator;
257  }
258
259  // ditto
260  public NonceGenerator getNonceGenerator() {
261    return nonceGenerator;
262  }
263
264  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
265    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
266  }
267
268  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
269    return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
270      getStubKey(ClientService.getDescriptor().getName(), serverName),
271      () -> createRegionServerStub(serverName));
272  }
273
274  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
275    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
276  }
277
278  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
279    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
280  }
281
282  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
283    return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
284      getStubKey(AdminService.getDescriptor().getName(), serverName),
285      () -> createAdminServerStub(serverName));
286  }
287
288  CompletableFuture<MasterService.Interface> getMasterStub() {
289    return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
290      CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
291      addListener(registry.getActiveMaster(), (addr, error) -> {
292        if (error != null) {
293          future.completeExceptionally(error);
294        } else if (addr == null) {
295          future.completeExceptionally(new MasterNotRunningException(
296            "ZooKeeper available but no active master location found"));
297        } else {
298          LOG.debug("The fetched master address is {}", addr);
299          try {
300            future.complete(createMasterStub(addr));
301          } catch (IOException e) {
302            future.completeExceptionally(e);
303          }
304        }
305
306      });
307      return future;
308    }, stub -> true, "master stub");
309  }
310
311  String getClusterId() {
312    try {
313      return registry.getClusterId().get();
314    } catch (InterruptedException | ExecutionException e) {
315      LOG.error("Error fetching cluster ID: ", e);
316    }
317    return null;
318  }
319
320  void clearMasterStubCache(MasterService.Interface stub) {
321    masterStub.compareAndSet(stub, null);
322  }
323
324  Optional<ServerStatisticTracker> getStatisticsTracker() {
325    return stats;
326  }
327
328  ClientBackoffPolicy getBackoffPolicy() {
329    return backoffPolicy;
330  }
331
332  @Override
333  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
334    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
335
336      @Override
337      public AsyncTable<AdvancedScanResultConsumer> build() {
338        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
339      }
340    };
341  }
342
343  @Override
344  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
345    ExecutorService pool) {
346    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
347
348      @Override
349      public AsyncTable<ScanResultConsumer> build() {
350        RawAsyncTableImpl rawTable =
351          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
352        return new AsyncTableImpl(rawTable, pool);
353      }
354    };
355  }
356
357  @Override
358  public AsyncAdminBuilder getAdminBuilder() {
359    return new AsyncAdminBuilderBase(connConf) {
360      @Override
361      public AsyncAdmin build() {
362        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
363      }
364    };
365  }
366
367  @Override
368  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
369    return new AsyncAdminBuilderBase(connConf) {
370      @Override
371      public AsyncAdmin build() {
372        RawAsyncHBaseAdmin rawAdmin =
373          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
374        return new AsyncHBaseAdmin(rawAdmin, pool);
375      }
376    };
377  }
378
379  @Override
380  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
381    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
382  }
383
384  @Override
385  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
386    ExecutorService pool) {
387    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
388      RETRY_TIMER);
389  }
390
391  private Hbck getHbckInternal(ServerName masterServer) {
392    Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName());
393    // we will not create a new connection when creating a new protobuf stub, and for hbck there
394    // will be no performance consideration, so for simplification we will create a new stub every
395    // time instead of caching the stub here.
396    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
397      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
398  }
399
400  @Override
401  public CompletableFuture<Hbck> getHbck() {
402    return TraceUtil.tracedFuture(() -> {
403      CompletableFuture<Hbck> future = new CompletableFuture<>();
404      addListener(registry.getActiveMaster(), (sn, error) -> {
405        if (error != null) {
406          future.completeExceptionally(error);
407        } else {
408          future.complete(getHbckInternal(sn));
409        }
410      });
411      return future;
412    }, "AsyncConnection.getHbck");
413  }
414
415  @Override
416  public Hbck getHbck(ServerName masterServer) {
417    return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck");
418  }
419
420  @Override
421  public void clearRegionLocationCache() {
422    locator.clearCache();
423  }
424
425  Optional<MetricsConnection> getConnectionMetrics() {
426    return metrics;
427  }
428}