001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
028import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
029
030import java.io.IOException;
031import java.net.SocketAddress;
032import java.util.Optional;
033import java.util.concurrent.CompletableFuture;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.ExecutionException;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.concurrent.atomic.AtomicReference;
041import org.apache.commons.io.IOUtils;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.hbase.AuthUtil;
044import org.apache.hadoop.hbase.ChoreService;
045import org.apache.hadoop.hbase.MasterNotRunningException;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
049import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
050import org.apache.hadoop.hbase.ipc.RpcClient;
051import org.apache.hadoop.hbase.ipc.RpcClientFactory;
052import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
053import org.apache.hadoop.hbase.security.User;
054import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.hadoop.security.UserGroupInformation;
057import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
063import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
064
065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
069
070/**
071 * The implementation of AsyncConnection.
072 */
073@InterfaceAudience.Private
074class AsyncConnectionImpl implements AsyncConnection {
075
076  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
077
078  @VisibleForTesting
079  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
080    new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
081      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
082    10, TimeUnit.MILLISECONDS);
083
084  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
085
086  private final Configuration conf;
087
088  final AsyncConnectionConfiguration connConf;
089
090  private final User user;
091
092  final ConnectionRegistry registry;
093
094  private final int rpcTimeout;
095
096  protected final RpcClient rpcClient;
097
098  final RpcControllerFactory rpcControllerFactory;
099
100  private final boolean hostnameCanChange;
101
102  private final AsyncRegionLocator locator;
103
104  final AsyncRpcRetryingCallerFactory callerFactory;
105
106  private final NonceGenerator nonceGenerator;
107
108  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
109  private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
110
111  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
112
113  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
114    new AtomicReference<>();
115
116  private final Optional<ServerStatisticTracker> stats;
117  private final ClientBackoffPolicy backoffPolicy;
118
119  private ChoreService authService;
120
121  private final AtomicBoolean closed = new AtomicBoolean(false);
122
123  private final Optional<MetricsConnection> metrics;
124
125  private final ClusterStatusListener clusterStatusListener;
126
127  private volatile ConnectionOverAsyncConnection conn;
128
129  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
130      SocketAddress localAddress, User user) {
131    this.conf = conf;
132    this.user = user;
133    if (user.isLoginFromKeytab()) {
134      spawnRenewalChore(user.getUGI());
135    }
136    this.connConf = new AsyncConnectionConfiguration(conf);
137    this.registry = registry;
138    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
139      this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
140    } else {
141      this.metrics = Optional.empty();
142    }
143    this.rpcClient = RpcClientFactory.createClient(
144        conf, clusterId, localAddress, metrics.orElse(null));
145    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
146    this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
147    this.rpcTimeout =
148      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
149    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
150    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
151    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
152      nonceGenerator = PerClientRandomNonceGenerator.get();
153    } else {
154      nonceGenerator = NO_NONCE_GENERATOR;
155    }
156    this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
157    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
158    ClusterStatusListener listener = null;
159    if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
160      // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
161      // it in, just like clusterId. Not a big problem for now as the default value is false.
162      Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
163        STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
164      if (listenerClass == null) {
165        LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
166      } else {
167        try {
168          listener = new ClusterStatusListener(
169            new ClusterStatusListener.DeadServerHandler() {
170              @Override
171              public void newDead(ServerName sn) {
172                locator.clearCache(sn);
173                rpcClient.cancelConnections(sn);
174              }
175            }, conf, listenerClass);
176        } catch (IOException e) {
177          LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
178        }
179      }
180    }
181    this.clusterStatusListener = listener;
182  }
183
184  private void spawnRenewalChore(final UserGroupInformation user) {
185    authService = new ChoreService("Relogin service");
186    authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
187  }
188
189  @Override
190  public Configuration getConfiguration() {
191    return conf;
192  }
193
194  @Override
195  public boolean isClosed() {
196    return closed.get();
197  }
198
199  @Override
200  public void close() {
201    if (!closed.compareAndSet(false, true)) {
202      return;
203    }
204    LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
205    if(LOG.isDebugEnabled()){
206      logCallStack(Thread.currentThread().getStackTrace());
207    }
208    IOUtils.closeQuietly(clusterStatusListener);
209    IOUtils.closeQuietly(rpcClient);
210    IOUtils.closeQuietly(registry);
211    if (authService != null) {
212      authService.shutdown();
213    }
214    metrics.ifPresent(MetricsConnection::shutdown);
215    ConnectionOverAsyncConnection c = this.conn;
216    if (c != null) {
217      c.closePool();
218    }
219  }
220
221  private void logCallStack(StackTraceElement[] stackTraceElements) {
222    StringBuilder stackBuilder = new StringBuilder("Call stack:");
223    for (StackTraceElement element : stackTraceElements) {
224      stackBuilder.append("\n    at ");
225      stackBuilder.append(element);
226    }
227    stackBuilder.append("\n");
228    LOG.debug(stackBuilder.toString());
229  }
230
231  @Override
232  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
233    return new AsyncTableRegionLocatorImpl(tableName, this);
234  }
235
236  @Override
237  public void clearRegionLocationCache() {
238    locator.clearCache();
239  }
240
241  // we will override this method for testing retry caller, so do not remove this method.
242  AsyncRegionLocator getLocator() {
243    return locator;
244  }
245
246  // ditto
247  NonceGenerator getNonceGenerator() {
248    return nonceGenerator;
249  }
250
251  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
252    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
253  }
254
255  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
256    return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
257      getStubKey(ClientService.getDescriptor().getName(), serverName, hostnameCanChange),
258      () -> createRegionServerStub(serverName));
259  }
260
261  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
262    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
263  }
264
265  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
266    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
267  }
268
269  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
270    return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
271      getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange),
272      () -> createAdminServerStub(serverName));
273  }
274
275  CompletableFuture<MasterService.Interface> getMasterStub() {
276    return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
277      CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
278      addListener(registry.getActiveMaster(), (addr, error) -> {
279        if (error != null) {
280          future.completeExceptionally(error);
281        } else if (addr == null) {
282          future.completeExceptionally(new MasterNotRunningException(
283            "ZooKeeper available but no active master location found"));
284        } else {
285          LOG.debug("The fetched master address is {}", addr);
286          try {
287            future.complete(createMasterStub(addr));
288          } catch (IOException e) {
289            future.completeExceptionally(e);
290          }
291        }
292
293      });
294      return future;
295    }, stub -> true, "master stub");
296  }
297
298  String getClusterId() {
299    try {
300      return registry.getClusterId().get();
301    } catch (InterruptedException | ExecutionException e) {
302      LOG.error("Error fetching cluster ID: ", e);
303    }
304    return null;
305  }
306
307  void clearMasterStubCache(MasterService.Interface stub) {
308    masterStub.compareAndSet(stub, null);
309  }
310
311  Optional<ServerStatisticTracker> getStatisticsTracker() {
312    return stats;
313  }
314
315  ClientBackoffPolicy getBackoffPolicy() {
316    return backoffPolicy;
317  }
318
319  @Override
320  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
321    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
322
323      @Override
324      public AsyncTable<AdvancedScanResultConsumer> build() {
325        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
326      }
327    };
328  }
329
330  @Override
331  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
332      ExecutorService pool) {
333    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
334
335      @Override
336      public AsyncTable<ScanResultConsumer> build() {
337        RawAsyncTableImpl rawTable =
338          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
339        return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
340      }
341    };
342  }
343
344  @Override
345  public AsyncAdminBuilder getAdminBuilder() {
346    return new AsyncAdminBuilderBase(connConf) {
347      @Override
348      public AsyncAdmin build() {
349        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
350      }
351    };
352  }
353
354  @Override
355  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
356    return new AsyncAdminBuilderBase(connConf) {
357      @Override
358      public AsyncAdmin build() {
359        RawAsyncHBaseAdmin rawAdmin =
360          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
361        return new AsyncHBaseAdmin(rawAdmin, pool);
362      }
363    };
364  }
365
366  @Override
367  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
368    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
369  }
370
371  @Override
372  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
373      ExecutorService pool) {
374    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
375      RETRY_TIMER);
376  }
377
378  @Override
379  public Connection toConnection() {
380    ConnectionOverAsyncConnection c = this.conn;
381    if (c != null) {
382      return c;
383    }
384    synchronized (this) {
385      c = this.conn;
386      if (c != null) {
387        return c;
388      }
389      c = new ConnectionOverAsyncConnection(this);
390      this.conn = c;
391    }
392    return c;
393  }
394
395  @Override
396  public CompletableFuture<Hbck> getHbck() {
397    CompletableFuture<Hbck> future = new CompletableFuture<>();
398    addListener(registry.getActiveMaster(), (sn, error) -> {
399      if (error != null) {
400        future.completeExceptionally(error);
401      } else {
402        try {
403          future.complete(getHbck(sn));
404        } catch (IOException e) {
405          future.completeExceptionally(e);
406        }
407      }
408    });
409    return future;
410  }
411
412  @Override
413  public Hbck getHbck(ServerName masterServer) throws IOException {
414    // we will not create a new connection when creating a new protobuf stub, and for hbck there
415    // will be no performance consideration, so for simplification we will create a new stub every
416    // time instead of caching the stub here.
417    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
418      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
419  }
420
421  Optional<MetricsConnection> getConnectionMetrics() {
422    return metrics;
423  }
424}