001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
028import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
029import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
030
031import io.opentelemetry.api.trace.Span;
032import java.io.IOException;
033import java.net.SocketAddress;
034import java.util.Optional;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.ConcurrentMap;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.ExecutorService;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicReference;
043import org.apache.commons.io.IOUtils;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.hbase.AuthUtil;
046import org.apache.hadoop.hbase.ChoreService;
047import org.apache.hadoop.hbase.MasterNotRunningException;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
051import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
052import org.apache.hadoop.hbase.ipc.RpcClient;
053import org.apache.hadoop.hbase.ipc.RpcClientFactory;
054import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
055import org.apache.hadoop.hbase.security.User;
056import org.apache.hadoop.hbase.trace.TraceUtil;
057import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
058import org.apache.hadoop.hbase.util.Threads;
059import org.apache.hadoop.security.UserGroupInformation;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
065import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
066
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
071
072/**
073 * The implementation of AsyncConnection.
074 */
075@InterfaceAudience.Private
076public class AsyncConnectionImpl implements AsyncConnection {
077
078  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
079
080  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
081    new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
082      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
083    10, TimeUnit.MILLISECONDS);
084
085  private final Configuration conf;
086
087  final AsyncConnectionConfiguration connConf;
088
089  protected final User user;
090
091  final ConnectionRegistry registry;
092
093  protected final int rpcTimeout;
094
095  protected final RpcClient rpcClient;
096
097  final RpcControllerFactory rpcControllerFactory;
098
099  private final AsyncRegionLocator locator;
100
101  final AsyncRpcRetryingCallerFactory callerFactory;
102
103  private final NonceGenerator nonceGenerator;
104
105  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
106  private final ConcurrentMap<String, AdminService.Interface> adminStubs =
107    new ConcurrentHashMap<>();
108
109  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
110
111  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
112    new AtomicReference<>();
113
114  private final Optional<ServerStatisticTracker> stats;
115  private final ClientBackoffPolicy backoffPolicy;
116
117  private ChoreService choreService;
118
119  private final AtomicBoolean closed = new AtomicBoolean(false);
120
121  private final String metricsScope;
122  private final Optional<MetricsConnection> metrics;
123
124  private final ClusterStatusListener clusterStatusListener;
125
126  private volatile ConnectionOverAsyncConnection conn;
127
128  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
129    SocketAddress localAddress, User user) {
130    this.conf = conf;
131    this.user = user;
132    this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
133
134    if (user.isLoginFromKeytab()) {
135      spawnRenewalChore(user.getUGI());
136    }
137    this.connConf = new AsyncConnectionConfiguration(conf);
138    this.registry = registry;
139    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
140      this.metrics =
141        Optional.of(MetricsConnection.getMetricsConnection(metricsScope, () -> null, () -> null));
142    } else {
143      this.metrics = Optional.empty();
144    }
145    this.rpcClient =
146      RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
147    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
148    this.rpcTimeout =
149      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
150    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
151    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
152    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
153      nonceGenerator = PerClientRandomNonceGenerator.get();
154    } else {
155      nonceGenerator = NO_NONCE_GENERATOR;
156    }
157    this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
158    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
159    ClusterStatusListener listener = null;
160    if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
161      // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
162      // it in, just like clusterId. Not a big problem for now as the default value is false.
163      Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
164        STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
165      if (listenerClass == null) {
166        LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
167      } else {
168        try {
169          listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
170            @Override
171            public void newDead(ServerName sn) {
172              locator.clearCache(sn);
173              rpcClient.cancelConnections(sn);
174            }
175          }, conf, listenerClass);
176        } catch (IOException e) {
177          LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
178        }
179      }
180    }
181    this.clusterStatusListener = listener;
182  }
183
184  private void spawnRenewalChore(final UserGroupInformation user) {
185    ChoreService service = getChoreService();
186    service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf));
187  }
188
189  /**
190   * If choreService has not been created yet, create the ChoreService.
191   */
192  synchronized ChoreService getChoreService() {
193    if (isClosed()) {
194      throw new IllegalStateException("connection is already closed");
195    }
196    if (choreService == null) {
197      choreService = new ChoreService("AsyncConn Chore Service");
198    }
199    return choreService;
200  }
201
202  public User getUser() {
203    return user;
204  }
205
206  public ConnectionRegistry getConnectionRegistry() {
207    return registry;
208  }
209
210  @Override
211  public Configuration getConfiguration() {
212    return conf;
213  }
214
215  @Override
216  public boolean isClosed() {
217    return closed.get();
218  }
219
220  @Override
221  public void close() {
222    TraceUtil.trace(() -> {
223      if (!closed.compareAndSet(false, true)) {
224        return;
225      }
226      LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
227      if (LOG.isDebugEnabled()) {
228        logCallStack(Thread.currentThread().getStackTrace());
229      }
230      IOUtils.closeQuietly(clusterStatusListener,
231        e -> LOG.warn("failed to close clusterStatusListener", e));
232      IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
233      IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
234      synchronized (this) {
235        if (choreService != null) {
236          choreService.shutdown();
237          choreService = null;
238        }
239      }
240      if (metrics.isPresent()) {
241        MetricsConnection.deleteMetricsConnection(metricsScope);
242      }
243      ConnectionOverAsyncConnection c = this.conn;
244      if (c != null) {
245        c.closePool();
246      }
247    }, "AsyncConnection.close");
248  }
249
250  private void logCallStack(StackTraceElement[] stackTraceElements) {
251    StringBuilder stackBuilder = new StringBuilder("Call stack:");
252    for (StackTraceElement element : stackTraceElements) {
253      stackBuilder.append("\n    at ");
254      stackBuilder.append(element);
255    }
256    stackBuilder.append("\n");
257    LOG.debug(stackBuilder.toString());
258  }
259
260  @Override
261  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
262    return new AsyncTableRegionLocatorImpl(tableName, this);
263  }
264
265  @Override
266  public void clearRegionLocationCache() {
267    locator.clearCache();
268  }
269
270  // we will override this method for testing retry caller, so do not remove this method.
271  AsyncRegionLocator getLocator() {
272    return locator;
273  }
274
275  // ditto
276  NonceGenerator getNonceGenerator() {
277    return nonceGenerator;
278  }
279
280  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
281    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
282  }
283
284  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
285    return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
286      getStubKey(ClientService.getDescriptor().getName(), serverName),
287      () -> createRegionServerStub(serverName));
288  }
289
290  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
291    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
292  }
293
294  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
295    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
296  }
297
298  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
299    return ConcurrentMapUtils.computeIfAbsentEx(adminStubs,
300      getStubKey(AdminService.getDescriptor().getName(), serverName),
301      () -> createAdminServerStub(serverName));
302  }
303
304  CompletableFuture<MasterService.Interface> getMasterStub() {
305    return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
306      CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
307      addListener(registry.getActiveMaster(), (addr, error) -> {
308        if (error != null) {
309          future.completeExceptionally(error);
310        } else if (addr == null) {
311          future.completeExceptionally(new MasterNotRunningException(
312            "ZooKeeper available but no active master location found"));
313        } else {
314          LOG.debug("The fetched master address is {}", addr);
315          try {
316            future.complete(createMasterStub(addr));
317          } catch (IOException e) {
318            future.completeExceptionally(e);
319          }
320        }
321
322      });
323      return future;
324    }, stub -> true, "master stub");
325  }
326
327  String getClusterId() {
328    try {
329      return registry.getClusterId().get();
330    } catch (InterruptedException | ExecutionException e) {
331      LOG.error("Error fetching cluster ID: ", e);
332    }
333    return null;
334  }
335
336  void clearMasterStubCache(MasterService.Interface stub) {
337    masterStub.compareAndSet(stub, null);
338  }
339
340  Optional<ServerStatisticTracker> getStatisticsTracker() {
341    return stats;
342  }
343
344  ClientBackoffPolicy getBackoffPolicy() {
345    return backoffPolicy;
346  }
347
348  @Override
349  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
350    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
351
352      @Override
353      public AsyncTable<AdvancedScanResultConsumer> build() {
354        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
355      }
356    };
357  }
358
359  @Override
360  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
361    ExecutorService pool) {
362    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
363
364      @Override
365      public AsyncTable<ScanResultConsumer> build() {
366        RawAsyncTableImpl rawTable =
367          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
368        return new AsyncTableImpl(rawTable, pool);
369      }
370    };
371  }
372
373  @Override
374  public AsyncAdminBuilder getAdminBuilder() {
375    return new AsyncAdminBuilderBase(connConf) {
376      @Override
377      public AsyncAdmin build() {
378        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
379      }
380    };
381  }
382
383  @Override
384  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
385    return new AsyncAdminBuilderBase(connConf) {
386      @Override
387      public AsyncAdmin build() {
388        RawAsyncHBaseAdmin rawAdmin =
389          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
390        return new AsyncHBaseAdmin(rawAdmin, pool);
391      }
392    };
393  }
394
395  @Override
396  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
397    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
398  }
399
400  @Override
401  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
402    ExecutorService pool) {
403    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
404      RETRY_TIMER);
405  }
406
407  @Override
408  public Connection toConnection() {
409    ConnectionOverAsyncConnection c = this.conn;
410    if (c != null) {
411      return c;
412    }
413    synchronized (this) {
414      c = this.conn;
415      if (c != null) {
416        return c;
417      }
418      c = new ConnectionOverAsyncConnection(this);
419      this.conn = c;
420    }
421    return c;
422  }
423
424  private Hbck getHbckInternal(ServerName masterServer) {
425    Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName());
426    // we will not create a new connection when creating a new protobuf stub, and for hbck there
427    // will be no performance consideration, so for simplification we will create a new stub every
428    // time instead of caching the stub here.
429    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
430      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
431  }
432
433  @Override
434  public CompletableFuture<Hbck> getHbck() {
435    return TraceUtil.tracedFuture(() -> {
436      CompletableFuture<Hbck> future = new CompletableFuture<>();
437      addListener(registry.getActiveMaster(), (sn, error) -> {
438        if (error != null) {
439          future.completeExceptionally(error);
440        } else {
441          future.complete(getHbckInternal(sn));
442        }
443      });
444      return future;
445    }, "AsyncConnection.getHbck");
446  }
447
448  @Override
449  public Hbck getHbck(ServerName masterServer) {
450    return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck");
451  }
452
453  Optional<MetricsConnection> getConnectionMetrics() {
454    return metrics;
455  }
456}