001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
028import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
029
030import java.io.IOException;
031import java.util.Optional;
032import java.util.concurrent.CompletableFuture;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicReference;
038import org.apache.commons.io.IOUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.AuthUtil;
041import org.apache.hadoop.hbase.ChoreService;
042import org.apache.hadoop.hbase.MasterNotRunningException;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
046import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
047import org.apache.hadoop.hbase.ipc.RpcClient;
048import org.apache.hadoop.hbase.ipc.RpcClientFactory;
049import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
055import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
056import org.apache.hadoop.hbase.util.Threads;
057import org.apache.hadoop.security.UserGroupInformation;
058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
059import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064/**
065 * The implementation of AsyncConnection.
066 */
067@InterfaceAudience.Private
068class AsyncConnectionImpl implements AsyncConnection {
069
070  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
071
072  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
073    new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
074      .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
075    10, TimeUnit.MILLISECONDS);
076
077  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
078
079  private final Configuration conf;
080
081  final AsyncConnectionConfiguration connConf;
082
083  private final User user;
084
085  final ConnectionRegistry registry;
086
087  private final int rpcTimeout;
088
089  private final RpcClient rpcClient;
090
091  final RpcControllerFactory rpcControllerFactory;
092
093  private final boolean hostnameCanChange;
094
095  private final AsyncRegionLocator locator;
096
097  final AsyncRpcRetryingCallerFactory callerFactory;
098
099  private final NonceGenerator nonceGenerator;
100
101  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
102  private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
103
104  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
105
106  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
107    new AtomicReference<>();
108
109  private final Optional<ServerStatisticTracker> stats;
110  private final ClientBackoffPolicy backoffPolicy;
111
112  private ChoreService choreService;
113
114  private volatile boolean closed = false;
115
116  private final Optional<MetricsConnection> metrics;
117
118  private final ClusterStatusListener clusterStatusListener;
119
120  public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
121      User user) {
122    this.conf = conf;
123    this.user = user;
124
125    if (user.isLoginFromKeytab()) {
126      spawnRenewalChore(user.getUGI());
127    }
128    this.connConf = new AsyncConnectionConfiguration(conf);
129    this.registry = registry;
130    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
131      this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
132    } else {
133      this.metrics = Optional.empty();
134    }
135    this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null));
136    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
137    this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
138    this.rpcTimeout =
139      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
140    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
141    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
142    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
143      nonceGenerator = PerClientRandomNonceGenerator.get();
144    } else {
145      nonceGenerator = NO_NONCE_GENERATOR;
146    }
147    this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
148    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
149    ClusterStatusListener listener = null;
150    if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
151      // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
152      // it in, just like clusterId. Not a big problem for now as the default value is false.
153      Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
154        STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
155      if (listenerClass == null) {
156        LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
157      } else {
158        try {
159          listener = new ClusterStatusListener(
160            new ClusterStatusListener.DeadServerHandler() {
161              @Override
162              public void newDead(ServerName sn) {
163                locator.clearCache(sn);
164                rpcClient.cancelConnections(sn);
165              }
166            }, conf, listenerClass);
167        } catch (IOException e) {
168          LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
169        }
170      }
171    }
172    this.clusterStatusListener = listener;
173  }
174
175  private void spawnRenewalChore(final UserGroupInformation user) {
176    ChoreService service = getChoreService();
177    service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
178  }
179
180  /**
181   * If choreService has not been created yet, create the ChoreService.
182   * @return ChoreService
183   */
184  synchronized ChoreService getChoreService() {
185    if (choreService == null) {
186      choreService = new ChoreService("AsyncConn Chore Service");
187    }
188    return choreService;
189  }
190
191  @Override
192  public Configuration getConfiguration() {
193    return conf;
194  }
195
196  @Override
197  public void close() {
198    // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a
199    // simple volatile flag.
200    if (closed) {
201      return;
202    }
203    LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
204    if(LOG.isDebugEnabled()){
205      logCallStack(Thread.currentThread().getStackTrace());
206    }
207    IOUtils.closeQuietly(clusterStatusListener);
208    IOUtils.closeQuietly(rpcClient);
209    IOUtils.closeQuietly(registry);
210    if (choreService != null) {
211      choreService.shutdown();
212    }
213    metrics.ifPresent(MetricsConnection::shutdown);
214    closed = true;
215  }
216
217  private void logCallStack(StackTraceElement[] stackTraceElements) {
218    StringBuilder stackBuilder = new StringBuilder("Call stack:");
219    for (StackTraceElement element : stackTraceElements) {
220      stackBuilder.append("\n    at ");
221      stackBuilder.append(element);
222    }
223    stackBuilder.append("\n");
224    LOG.debug(stackBuilder.toString());
225  }
226
227  @Override
228  public boolean isClosed() {
229    return closed;
230  }
231
232  @Override
233  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
234    return new AsyncTableRegionLocatorImpl(tableName, this);
235  }
236
237  // we will override this method for testing retry caller, so do not remove this method.
238  AsyncRegionLocator getLocator() {
239    return locator;
240  }
241
242  // ditto
243  public NonceGenerator getNonceGenerator() {
244    return nonceGenerator;
245  }
246
247  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
248    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
249  }
250
251  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
252    return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
253      getStubKey(ClientService.getDescriptor().getName(), serverName, hostnameCanChange),
254      () -> createRegionServerStub(serverName));
255  }
256
257  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
258    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
259  }
260
261  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
262    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
263  }
264
265  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
266    return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
267      getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange),
268      () -> createAdminServerStub(serverName));
269  }
270
271  CompletableFuture<MasterService.Interface> getMasterStub() {
272    return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
273      CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
274      addListener(registry.getActiveMaster(), (addr, error) -> {
275        if (error != null) {
276          future.completeExceptionally(error);
277        } else if (addr == null) {
278          future.completeExceptionally(new MasterNotRunningException(
279            "ZooKeeper available but no active master location found"));
280        } else {
281          LOG.debug("The fetched master address is {}", addr);
282          try {
283            future.complete(createMasterStub(addr));
284          } catch (IOException e) {
285            future.completeExceptionally(e);
286          }
287        }
288
289      });
290      return future;
291    }, stub -> true, "master stub");
292  }
293
294  void clearMasterStubCache(MasterService.Interface stub) {
295    masterStub.compareAndSet(stub, null);
296  }
297
298  Optional<ServerStatisticTracker> getStatisticsTracker() {
299    return stats;
300  }
301
302  ClientBackoffPolicy getBackoffPolicy() {
303    return backoffPolicy;
304  }
305
306  @Override
307  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
308    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
309
310      @Override
311      public AsyncTable<AdvancedScanResultConsumer> build() {
312        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
313      }
314    };
315  }
316
317  @Override
318  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
319      ExecutorService pool) {
320    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
321
322      @Override
323      public AsyncTable<ScanResultConsumer> build() {
324        RawAsyncTableImpl rawTable =
325          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
326        return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
327      }
328    };
329  }
330
331  @Override
332  public AsyncAdminBuilder getAdminBuilder() {
333    return new AsyncAdminBuilderBase(connConf) {
334      @Override
335      public AsyncAdmin build() {
336        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
337      }
338    };
339  }
340
341  @Override
342  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
343    return new AsyncAdminBuilderBase(connConf) {
344      @Override
345      public AsyncAdmin build() {
346        RawAsyncHBaseAdmin rawAdmin =
347          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
348        return new AsyncHBaseAdmin(rawAdmin, pool);
349      }
350    };
351  }
352
353  @Override
354  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
355    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
356  }
357
358  @Override
359  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
360      ExecutorService pool) {
361    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
362      RETRY_TIMER);
363  }
364
365  @Override
366  public CompletableFuture<Hbck> getHbck() {
367    CompletableFuture<Hbck> future = new CompletableFuture<>();
368    addListener(registry.getActiveMaster(), (sn, error) -> {
369      if (error != null) {
370        future.completeExceptionally(error);
371      } else {
372        try {
373          future.complete(getHbck(sn));
374        } catch (IOException e) {
375          future.completeExceptionally(e);
376        }
377      }
378    });
379    return future;
380  }
381
382  @Override
383  public Hbck getHbck(ServerName masterServer) throws IOException {
384    // we will not create a new connection when creating a new protobuf stub, and for hbck there
385    // will be no performance consideration, so for simplification we will create a new stub every
386    // time instead of caching the stub here.
387    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
388      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
389  }
390
391  @Override
392  public void clearRegionLocationCache() {
393    locator.clearCache();
394  }
395
396  Optional<MetricsConnection> getConnectionMetrics() {
397    return metrics;
398  }
399}