001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
021import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
022import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
023import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
024import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
025import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
026import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
027import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
028import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
029
030import java.io.IOException;
031import java.util.Optional;
032import java.util.concurrent.CompletableFuture;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.ExecutorService;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.atomic.AtomicReference;
038import org.apache.commons.io.IOUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.AuthUtil;
041import org.apache.hadoop.hbase.ChoreService;
042import org.apache.hadoop.hbase.MasterNotRunningException;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
046import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
047import org.apache.hadoop.hbase.ipc.RpcClient;
048import org.apache.hadoop.hbase.ipc.RpcClientFactory;
049import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
052import org.apache.hadoop.hbase.util.Threads;
053import org.apache.hadoop.security.UserGroupInformation;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
065
066/**
067 * The implementation of AsyncConnection.
068 */
069@InterfaceAudience.Private
070class AsyncConnectionImpl implements AsyncConnection {
071
072  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
073
074  @VisibleForTesting
075  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
076    Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
077
078  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
079
080  private final Configuration conf;
081
082  final AsyncConnectionConfiguration connConf;
083
084  private final User user;
085
086  final AsyncRegistry registry;
087
088  private final int rpcTimeout;
089
090  private final RpcClient rpcClient;
091
092  final RpcControllerFactory rpcControllerFactory;
093
094  private final boolean hostnameCanChange;
095
096  private final AsyncRegionLocator locator;
097
098  final AsyncRpcRetryingCallerFactory callerFactory;
099
100  private final NonceGenerator nonceGenerator;
101
102  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
103  private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
104
105  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
106
107  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
108    new AtomicReference<>();
109
110  private final Optional<ServerStatisticTracker> stats;
111  private final ClientBackoffPolicy backoffPolicy;
112
113  private ChoreService authService;
114
115  private volatile boolean closed = false;
116
117  private final Optional<MetricsConnection> metrics;
118
119  private final ClusterStatusListener clusterStatusListener;
120
121  public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
122      User user) {
123    this.conf = conf;
124    this.user = user;
125    if (user.isLoginFromKeytab()) {
126      spawnRenewalChore(user.getUGI());
127    }
128    this.connConf = new AsyncConnectionConfiguration(conf);
129    this.registry = registry;
130    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
131      this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
132    } else {
133      this.metrics = Optional.empty();
134    }
135    this.rpcClient = RpcClientFactory.createClient(conf, clusterId, metrics.orElse(null));
136    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
137    this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
138    this.rpcTimeout =
139      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
140    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
141    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
142    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
143      nonceGenerator = PerClientRandomNonceGenerator.get();
144    } else {
145      nonceGenerator = NO_NONCE_GENERATOR;
146    }
147    this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
148    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
149    ClusterStatusListener listener = null;
150    if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
151      // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
152      // it in, just like clusterId. Not a big problem for now as the default value is false.
153      Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
154        STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
155      if (listenerClass == null) {
156        LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
157      } else {
158        try {
159          listener = new ClusterStatusListener(
160            new ClusterStatusListener.DeadServerHandler() {
161              @Override
162              public void newDead(ServerName sn) {
163                locator.clearCache(sn);
164                rpcClient.cancelConnections(sn);
165              }
166            }, conf, listenerClass);
167        } catch (IOException e) {
168          LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...",
169            e);
170        }
171      }
172    }
173    this.clusterStatusListener = listener;
174  }
175
176  private void spawnRenewalChore(final UserGroupInformation user) {
177    authService = new ChoreService("Relogin service");
178    authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
179  }
180
181  @Override
182  public Configuration getConfiguration() {
183    return conf;
184  }
185
186  @Override
187  public void close() {
188    // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a
189    // simple volatile flag.
190    if (closed) {
191      return;
192    }
193    IOUtils.closeQuietly(clusterStatusListener);
194    IOUtils.closeQuietly(rpcClient);
195    IOUtils.closeQuietly(registry);
196    if (authService != null) {
197      authService.shutdown();
198    }
199    metrics.ifPresent(MetricsConnection::shutdown);
200    closed = true;
201  }
202
203  @Override
204  public boolean isClosed() {
205    return closed;
206  }
207
208  @Override
209  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
210    return new AsyncTableRegionLocatorImpl(tableName, this);
211  }
212
213  // we will override this method for testing retry caller, so do not remove this method.
214  AsyncRegionLocator getLocator() {
215    return locator;
216  }
217
218  // ditto
219  @VisibleForTesting
220  public NonceGenerator getNonceGenerator() {
221    return nonceGenerator;
222  }
223
224  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
225    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
226  }
227
228  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
229    return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
230      getStubKey(ClientService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
231      () -> createRegionServerStub(serverName));
232  }
233
234  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
235    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
236  }
237
238  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
239    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
240  }
241
242  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
243    return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
244      getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
245      () -> createAdminServerStub(serverName));
246  }
247
248  CompletableFuture<MasterService.Interface> getMasterStub() {
249    return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
250      CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
251      addListener(registry.getMasterAddress(), (addr, error) -> {
252        if (error != null) {
253          future.completeExceptionally(error);
254        } else if (addr == null) {
255          future.completeExceptionally(new MasterNotRunningException(
256            "ZooKeeper available but no active master location found"));
257        } else {
258          LOG.debug("The fetched master address is {}", addr);
259          try {
260            future.complete(createMasterStub(addr));
261          } catch (IOException e) {
262            future.completeExceptionally(e);
263          }
264        }
265
266      });
267      return future;
268    }, stub -> true, "master stub");
269  }
270
271  void clearMasterStubCache(MasterService.Interface stub) {
272    masterStub.compareAndSet(stub, null);
273  }
274
275  Optional<ServerStatisticTracker> getStatisticsTracker() {
276    return stats;
277  }
278
279  ClientBackoffPolicy getBackoffPolicy() {
280    return backoffPolicy;
281  }
282
283  @Override
284  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
285    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
286
287      @Override
288      public AsyncTable<AdvancedScanResultConsumer> build() {
289        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
290      }
291    };
292  }
293
294  @Override
295  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
296      ExecutorService pool) {
297    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
298
299      @Override
300      public AsyncTable<ScanResultConsumer> build() {
301        RawAsyncTableImpl rawTable =
302          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
303        return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
304      }
305    };
306  }
307
308  @Override
309  public AsyncAdminBuilder getAdminBuilder() {
310    return new AsyncAdminBuilderBase(connConf) {
311      @Override
312      public AsyncAdmin build() {
313        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
314      }
315    };
316  }
317
318  @Override
319  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
320    return new AsyncAdminBuilderBase(connConf) {
321      @Override
322      public AsyncAdmin build() {
323        RawAsyncHBaseAdmin rawAdmin =
324          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
325        return new AsyncHBaseAdmin(rawAdmin, pool);
326      }
327    };
328  }
329
330  @Override
331  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
332    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
333  }
334
335  @Override
336  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
337      ExecutorService pool) {
338    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
339      RETRY_TIMER);
340  }
341
342  @Override
343  public CompletableFuture<Hbck> getHbck() {
344    CompletableFuture<Hbck> future = new CompletableFuture<>();
345    addListener(registry.getMasterAddress(), (sn, error) -> {
346      if (error != null) {
347        future.completeExceptionally(error);
348      } else {
349        try {
350          future.complete(getHbck(sn));
351        } catch (IOException e) {
352          future.completeExceptionally(e);
353        }
354      }
355    });
356    return future;
357  }
358
359  @Override
360  public Hbck getHbck(ServerName masterServer) throws IOException {
361    // we will not create a new connection when creating a new protobuf stub, and for hbck there
362    // will be no performance consideration, so for simplification we will create a new stub every
363    // time instead of caching the stub here.
364    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
365      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
366  }
367
368  @Override
369  public void clearRegionLocationCache() {
370    locator.clearCache();
371  }
372
373  Optional<MetricsConnection> getConnectionMetrics() {
374    return metrics;
375  }
376}