001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
028import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
029import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
030
031import io.opentelemetry.api.trace.Span;
032import java.io.IOException;
033import java.net.SocketAddress;
034import java.util.Collections;
035import java.util.Map;
036import java.util.Optional;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.ConcurrentMap;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.ExecutorService;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.atomic.AtomicBoolean;
044import java.util.concurrent.atomic.AtomicReference;
045import org.apache.commons.io.IOUtils;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.hbase.AuthUtil;
048import org.apache.hadoop.hbase.ChoreService;
049import org.apache.hadoop.hbase.MasterNotRunningException;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
053import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
054import org.apache.hadoop.hbase.ipc.RpcClient;
055import org.apache.hadoop.hbase.ipc.RpcClientFactory;
056import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
057import org.apache.hadoop.hbase.security.User;
058import org.apache.hadoop.hbase.trace.TraceUtil;
059import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
060import org.apache.hadoop.hbase.util.Threads;
061import org.apache.hadoop.security.UserGroupInformation;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
067import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
068
069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
073
074/**
075 * The implementation of AsyncConnection.
076 */
077@InterfaceAudience.Private
078public class AsyncConnectionImpl implements AsyncConnection {
079
080  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
081
082  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
083    new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
084      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
085    10, TimeUnit.MILLISECONDS);
086
087  private final Configuration conf;
088
089  final AsyncConnectionConfiguration connConf;
090
091  protected final User user;
092
093  final ConnectionRegistry registry;
094
095  protected final int rpcTimeout;
096
097  protected final RpcClient rpcClient;
098
099  final RpcControllerFactory rpcControllerFactory;
100
101  private final AsyncRegionLocator locator;
102
103  final AsyncRpcRetryingCallerFactory callerFactory;
104
105  private final NonceGenerator nonceGenerator;
106
107  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
108  private final ConcurrentMap<String, AdminService.Interface> adminStubs =
109    new ConcurrentHashMap<>();
110
111  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
112
113  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
114    new AtomicReference<>();
115
116  private final Optional<ServerStatisticTracker> stats;
117  private final ClientBackoffPolicy backoffPolicy;
118
119  private ChoreService choreService;
120
121  private final AtomicBoolean closed = new AtomicBoolean(false);
122
123  private final String metricsScope;
124  private final Optional<MetricsConnection> metrics;
125
126  private final ClusterStatusListener clusterStatusListener;
127
128  private volatile ConnectionOverAsyncConnection conn;
129
130  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
131    SocketAddress localAddress, User user) {
132    this(conf, registry, clusterId, localAddress, user, Collections.emptyMap());
133  }
134
135  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
136    SocketAddress localAddress, User user, Map<String, byte[]> connectionAttributes) {
137    this.conf = conf;
138    this.user = user;
139    this.metricsScope = MetricsConnection.getScope(conf, clusterId, this);
140
141    if (user.isLoginFromKeytab()) {
142      spawnRenewalChore(user.getUGI());
143    }
144    this.connConf = new AsyncConnectionConfiguration(conf);
145    this.registry = registry;
146    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
147      this.metrics = Optional
148        .of(MetricsConnection.getMetricsConnection(conf, metricsScope, () -> null, () -> null));
149    } else {
150      this.metrics = Optional.empty();
151    }
152    this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress,
153      metrics.orElse(null), connectionAttributes);
154    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
155    this.rpcTimeout =
156      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
157    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
158    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
159    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
160      nonceGenerator = PerClientRandomNonceGenerator.get();
161    } else {
162      nonceGenerator = NO_NONCE_GENERATOR;
163    }
164    this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
165    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
166    ClusterStatusListener listener = null;
167    if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
168      // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
169      // it in, just like clusterId. Not a big problem for now as the default value is false.
170      Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
171        STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
172      if (listenerClass == null) {
173        LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
174      } else {
175        try {
176          listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
177            @Override
178            public void newDead(ServerName sn) {
179              locator.clearCache(sn);
180              rpcClient.cancelConnections(sn);
181            }
182          }, conf, listenerClass);
183        } catch (IOException e) {
184          LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
185        }
186      }
187    }
188    this.clusterStatusListener = listener;
189  }
190
191  private void spawnRenewalChore(final UserGroupInformation user) {
192    ChoreService service = getChoreService();
193    service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf));
194  }
195
196  /**
197   * If choreService has not been created yet, create the ChoreService.
198   */
199  synchronized ChoreService getChoreService() {
200    if (isClosed()) {
201      throw new IllegalStateException("connection is already closed");
202    }
203    if (choreService == null) {
204      choreService = new ChoreService("AsyncConn Chore Service");
205    }
206    return choreService;
207  }
208
209  public User getUser() {
210    return user;
211  }
212
213  public ConnectionRegistry getConnectionRegistry() {
214    return registry;
215  }
216
217  @Override
218  public Configuration getConfiguration() {
219    return conf;
220  }
221
222  @Override
223  public boolean isClosed() {
224    return closed.get();
225  }
226
227  @Override
228  public void close() {
229    TraceUtil.trace(() -> {
230      if (!closed.compareAndSet(false, true)) {
231        return;
232      }
233      LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
234      if (LOG.isDebugEnabled()) {
235        logCallStack(Thread.currentThread().getStackTrace());
236      }
237      IOUtils.closeQuietly(clusterStatusListener,
238        e -> LOG.warn("failed to close clusterStatusListener", e));
239      IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
240      IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
241      synchronized (this) {
242        if (choreService != null) {
243          choreService.shutdown();
244          choreService = null;
245        }
246      }
247      if (metrics.isPresent()) {
248        MetricsConnection.deleteMetricsConnection(metricsScope);
249      }
250      ConnectionOverAsyncConnection c = this.conn;
251      if (c != null) {
252        c.closePool();
253      }
254    }, "AsyncConnection.close");
255  }
256
257  private void logCallStack(StackTraceElement[] stackTraceElements) {
258    StringBuilder stackBuilder = new StringBuilder("Call stack:");
259    for (StackTraceElement element : stackTraceElements) {
260      stackBuilder.append("\n    at ");
261      stackBuilder.append(element);
262    }
263    stackBuilder.append("\n");
264    LOG.debug(stackBuilder.toString());
265  }
266
267  @Override
268  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
269    return new AsyncTableRegionLocatorImpl(tableName, this);
270  }
271
272  @Override
273  public void clearRegionLocationCache() {
274    locator.clearCache();
275  }
276
277  // we will override this method for testing retry caller, so do not remove this method.
278  AsyncRegionLocator getLocator() {
279    return locator;
280  }
281
282  // ditto
283  NonceGenerator getNonceGenerator() {
284    return nonceGenerator;
285  }
286
287  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
288    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
289  }
290
291  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
292    return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
293      getStubKey(ClientService.getDescriptor().getName(), serverName),
294      () -> createRegionServerStub(serverName));
295  }
296
297  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
298    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
299  }
300
301  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
302    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
303  }
304
305  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
306    return ConcurrentMapUtils.computeIfAbsentEx(adminStubs,
307      getStubKey(AdminService.getDescriptor().getName(), serverName),
308      () -> createAdminServerStub(serverName));
309  }
310
311  CompletableFuture<MasterService.Interface> getMasterStub() {
312    return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
313      CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
314      addListener(registry.getActiveMaster(), (addr, error) -> {
315        if (error != null) {
316          future.completeExceptionally(error);
317        } else if (addr == null) {
318          future.completeExceptionally(new MasterNotRunningException(
319            "ZooKeeper available but no active master location found"));
320        } else {
321          LOG.debug("The fetched master address is {}", addr);
322          try {
323            future.complete(createMasterStub(addr));
324          } catch (IOException e) {
325            future.completeExceptionally(e);
326          }
327        }
328
329      });
330      return future;
331    }, stub -> true, "master stub");
332  }
333
334  String getClusterId() {
335    try {
336      return registry.getClusterId().get();
337    } catch (InterruptedException | ExecutionException e) {
338      LOG.error("Error fetching cluster ID: ", e);
339    }
340    return null;
341  }
342
343  void clearMasterStubCache(MasterService.Interface stub) {
344    masterStub.compareAndSet(stub, null);
345  }
346
347  Optional<ServerStatisticTracker> getStatisticsTracker() {
348    return stats;
349  }
350
351  ClientBackoffPolicy getBackoffPolicy() {
352    return backoffPolicy;
353  }
354
355  @Override
356  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
357    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
358
359      @Override
360      public AsyncTable<AdvancedScanResultConsumer> build() {
361        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
362      }
363    };
364  }
365
366  @Override
367  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
368    ExecutorService pool) {
369    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
370
371      @Override
372      public AsyncTable<ScanResultConsumer> build() {
373        RawAsyncTableImpl rawTable =
374          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
375        return new AsyncTableImpl(rawTable, pool);
376      }
377    };
378  }
379
380  @Override
381  public AsyncAdminBuilder getAdminBuilder() {
382    return new AsyncAdminBuilderBase(connConf) {
383      @Override
384      public AsyncAdmin build() {
385        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
386      }
387    };
388  }
389
390  @Override
391  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
392    return new AsyncAdminBuilderBase(connConf) {
393      @Override
394      public AsyncAdmin build() {
395        RawAsyncHBaseAdmin rawAdmin =
396          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
397        return new AsyncHBaseAdmin(rawAdmin, pool);
398      }
399    };
400  }
401
402  @Override
403  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
404    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
405  }
406
407  @Override
408  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
409    ExecutorService pool) {
410    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
411      RETRY_TIMER);
412  }
413
414  @Override
415  public Connection toConnection() {
416    ConnectionOverAsyncConnection c = this.conn;
417    if (c != null) {
418      return c;
419    }
420    synchronized (this) {
421      c = this.conn;
422      if (c != null) {
423        return c;
424      }
425      c = new ConnectionOverAsyncConnection(this);
426      this.conn = c;
427    }
428    return c;
429  }
430
431  private Hbck getHbckInternal(ServerName masterServer) {
432    Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName());
433    // we will not create a new connection when creating a new protobuf stub, and for hbck there
434    // will be no performance consideration, so for simplification we will create a new stub every
435    // time instead of caching the stub here.
436    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
437      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
438  }
439
440  @Override
441  public CompletableFuture<Hbck> getHbck() {
442    return TraceUtil.tracedFuture(() -> {
443      CompletableFuture<Hbck> future = new CompletableFuture<>();
444      addListener(registry.getActiveMaster(), (sn, error) -> {
445        if (error != null) {
446          future.completeExceptionally(error);
447        } else {
448          future.complete(getHbckInternal(sn));
449        }
450      });
451      return future;
452    }, "AsyncConnection.getHbck");
453  }
454
455  @Override
456  public Hbck getHbck(ServerName masterServer) {
457    return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck");
458  }
459
460  Optional<MetricsConnection> getConnectionMetrics() {
461    return metrics;
462  }
463}