001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
021import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
022import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
023import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
024
025import java.io.IOException;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicReference;
032import org.apache.commons.io.IOUtils;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.MasterNotRunningException;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.ipc.HBaseRpcController;
038import org.apache.hadoop.hbase.ipc.RpcClient;
039import org.apache.hadoop.hbase.ipc.RpcClientFactory;
040import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
041import org.apache.hadoop.hbase.security.User;
042import org.apache.hadoop.hbase.util.CollectionUtils;
043import org.apache.hadoop.hbase.util.Threads;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
049import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
050import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
051
052import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsMasterRunningResponse;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
058
059/**
060 * The implementation of AsyncConnection.
061 */
062@InterfaceAudience.Private
063class AsyncConnectionImpl implements AsyncConnection {
064
065  private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
066
067  @VisibleForTesting
068  static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
069    Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS);
070
071  private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
072
073  private final Configuration conf;
074
075  final AsyncConnectionConfiguration connConf;
076
077  private final User user;
078
079  final AsyncRegistry registry;
080
081  private final int rpcTimeout;
082
083  private final RpcClient rpcClient;
084
085  final RpcControllerFactory rpcControllerFactory;
086
087  private final boolean hostnameCanChange;
088
089  private final AsyncRegionLocator locator;
090
091  final AsyncRpcRetryingCallerFactory callerFactory;
092
093  private final NonceGenerator nonceGenerator;
094
095  private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
096  private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
097
098  private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
099
100  private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
101    new AtomicReference<>();
102
103  public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
104      User user) {
105    this.conf = conf;
106    this.user = user;
107    this.connConf = new AsyncConnectionConfiguration(conf);
108    this.registry = registry;
109    this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
110    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
111    this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
112    this.rpcTimeout =
113      (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
114    this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
115    this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
116    if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
117      nonceGenerator = PerClientRandomNonceGenerator.get();
118    } else {
119      nonceGenerator = NO_NONCE_GENERATOR;
120    }
121  }
122
123  @Override
124  public Configuration getConfiguration() {
125    return conf;
126  }
127
128  @Override
129  public void close() {
130    IOUtils.closeQuietly(rpcClient);
131    IOUtils.closeQuietly(registry);
132  }
133
134  @Override
135  public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
136    return new AsyncTableRegionLocatorImpl(tableName, locator);
137  }
138
139  // we will override this method for testing retry caller, so do not remove this method.
140  AsyncRegionLocator getLocator() {
141    return locator;
142  }
143
144  // ditto
145  @VisibleForTesting
146  public NonceGenerator getNonceGenerator() {
147    return nonceGenerator;
148  }
149
150  private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
151    return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
152  }
153
154  ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
155    return CollectionUtils.computeIfAbsentEx(rsStubs,
156      getStubKey(ClientService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
157      () -> createRegionServerStub(serverName));
158  }
159
160  private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
161    return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
162  }
163
164  private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
165    return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
166  }
167
168  AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
169    return CollectionUtils.computeIfAbsentEx(adminSubs,
170      getStubKey(AdminService.Interface.class.getSimpleName(), serverName, hostnameCanChange),
171      () -> createAdminServerStub(serverName));
172  }
173
174  private void makeMasterStub(CompletableFuture<MasterService.Interface> future) {
175    addListener(registry.getMasterAddress(), (sn, error) -> {
176      if (sn == null) {
177        String msg = "ZooKeeper available but no active master location found";
178        LOG.info(msg);
179        this.masterStubMakeFuture.getAndSet(null)
180          .completeExceptionally(new MasterNotRunningException(msg));
181        return;
182      }
183      try {
184        MasterService.Interface stub = createMasterStub(sn);
185        HBaseRpcController controller = getRpcController();
186        stub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
187          new RpcCallback<IsMasterRunningResponse>() {
188            @Override
189            public void run(IsMasterRunningResponse resp) {
190              if (controller.failed() || resp == null ||
191                (resp != null && !resp.getIsMasterRunning())) {
192                masterStubMakeFuture.getAndSet(null).completeExceptionally(
193                  new MasterNotRunningException("Master connection is not running anymore"));
194              } else {
195                masterStub.set(stub);
196                masterStubMakeFuture.set(null);
197                future.complete(stub);
198              }
199            }
200          });
201      } catch (IOException e) {
202        this.masterStubMakeFuture.getAndSet(null)
203          .completeExceptionally(new IOException("Failed to create async master stub", e));
204      }
205    });
206  }
207
208  CompletableFuture<MasterService.Interface> getMasterStub() {
209    MasterService.Interface masterStub = this.masterStub.get();
210
211    if (masterStub == null) {
212      for (;;) {
213        if (this.masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
214          CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
215          makeMasterStub(future);
216        } else {
217          CompletableFuture<MasterService.Interface> future = this.masterStubMakeFuture.get();
218          if (future != null) {
219            return future;
220          }
221        }
222      }
223    }
224
225    for (;;) {
226      if (masterStubMakeFuture.compareAndSet(null, new CompletableFuture<>())) {
227        CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
228        HBaseRpcController controller = getRpcController();
229        masterStub.isMasterRunning(controller, RequestConverter.buildIsMasterRunningRequest(),
230          new RpcCallback<IsMasterRunningResponse>() {
231            @Override
232            public void run(IsMasterRunningResponse resp) {
233              if (controller.failed() || resp == null ||
234                (resp != null && !resp.getIsMasterRunning())) {
235                makeMasterStub(future);
236              } else {
237                future.complete(masterStub);
238              }
239            }
240          });
241      } else {
242        CompletableFuture<MasterService.Interface> future = masterStubMakeFuture.get();
243        if (future != null) {
244          return future;
245        }
246      }
247    }
248  }
249
250  private HBaseRpcController getRpcController() {
251    HBaseRpcController controller = this.rpcControllerFactory.newController();
252    controller.setCallTimeout((int) TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
253    return controller;
254  }
255
256  @Override
257  public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
258    return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
259
260      @Override
261      public AsyncTable<AdvancedScanResultConsumer> build() {
262        return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
263      }
264    };
265  }
266
267  @Override
268  public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
269      ExecutorService pool) {
270    return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
271
272      @Override
273      public AsyncTable<ScanResultConsumer> build() {
274        RawAsyncTableImpl rawTable =
275          new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
276        return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
277      }
278    };
279  }
280
281  @Override
282  public AsyncAdminBuilder getAdminBuilder() {
283    return new AsyncAdminBuilderBase(connConf) {
284      @Override
285      public AsyncAdmin build() {
286        return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
287      }
288    };
289  }
290
291  @Override
292  public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
293    return new AsyncAdminBuilderBase(connConf) {
294      @Override
295      public AsyncAdmin build() {
296        RawAsyncHBaseAdmin rawAdmin =
297          new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
298        return new AsyncHBaseAdmin(rawAdmin, pool);
299      }
300    };
301  }
302
303  @Override
304  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
305    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
306  }
307
308  @Override
309  public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
310      ExecutorService pool) {
311    return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
312      RETRY_TIMER);
313  }
314
315  @Override
316  public CompletableFuture<Hbck> getHbck() {
317    CompletableFuture<Hbck> future = new CompletableFuture<>();
318    addListener(registry.getMasterAddress(), (sn, error) -> {
319      if (error != null) {
320        future.completeExceptionally(error);
321      } else {
322        try {
323          future.complete(getHbck(sn));
324        } catch (IOException e) {
325          future.completeExceptionally(e);
326        }
327      }
328    });
329    return future;
330  }
331
332  @Override
333  public Hbck getHbck(ServerName masterServer) throws IOException {
334    // we will not create a new connection when creating a new protobuf stub, and for hbck there
335    // will be no performance consideration, so for simplification we will create a new stub every
336    // time instead of caching the stub here.
337    return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
338      rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
339  }
340}