001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.net.URI;
025import java.security.PrivilegedExceptionAction;
026import java.util.Collections;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutorService;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.AuthUtil;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.security.User;
034import org.apache.hadoop.hbase.security.UserProvider;
035import org.apache.hadoop.hbase.trace.TraceUtil;
036import org.apache.hadoop.hbase.util.FutureUtils;
037import org.apache.hadoop.hbase.util.ReflectionUtils;
038import org.apache.hadoop.hbase.util.Strings;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
044
045/**
046 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
047 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
048 * {@link Connection}, {@link Table} implementations are retrieved with
049 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
050 *
051 * <pre>
052 * Connection connection = ConnectionFactory.createConnection(config);
053 * Table table = connection.getTable(TableName.valueOf("table1"));
054 * try {
055 *   // Use the table as needed, for a single operation and a single thread
056 * } finally {
057 *   table.close();
058 *   connection.close();
059 * }
060 * </pre>
061 *
062 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
063 * credentials if caller has following two configurations set:
064 * <ul>
065 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
066 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
067 * </ul>
068 * By this way, caller can directly connect to kerberized cluster without caring login and
069 * credentials renewal logic in application.
070 *
071 * <pre>
072 * </pre>
073 *
074 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
075 * implementations.
076 * @see Connection
077 * @since 0.99.0
078 */
079@InterfaceAudience.Public
080public class ConnectionFactory {
081
082  private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactory.class);
083
084  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
085    "hbase.client.async.connection.impl";
086
087  /** No public c.tors */
088  protected ConnectionFactory() {
089  }
090
091  /**
092   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
093   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
094   * connection share zookeeper connection, meta cache, and connections to region servers and
095   * masters. <br>
096   * The caller is responsible for calling {@link Connection#close()} on the returned connection
097   * instance. Typical usage:
098   *
099   * <pre>
100   * try (Connection connection = ConnectionFactory.createConnection(conf);
101   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
102   *   table.get(...);
103   *   ...
104   * }
105   * </pre>
106   *
107   * @return Connection object for <code>conf</code>
108   */
109  public static Connection createConnection() throws IOException {
110    return createConnection(HBaseConfiguration.create());
111  }
112
113  /**
114   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
115   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
116   * connection share zookeeper connection, meta cache, and connections to region servers and
117   * masters. <br>
118   * The caller is responsible for calling {@link Connection#close()} on the returned connection
119   * instance. Typical usage:
120   *
121   * <pre>
122   * try (Connection connection = ConnectionFactory.createConnection(conf);
123   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
124   *   table.get(...);
125   *   ...
126   * }
127   * </pre>
128   *
129   * @param connectionUri the connection uri for the hbase cluster
130   * @return Connection object for <code>conf</code>
131   */
132  public static Connection createConnection(URI connectionUri) throws IOException {
133    return createConnection(connectionUri, HBaseConfiguration.create());
134  }
135
136  /**
137   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
138   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
139   * created from returned connection share zookeeper connection(if used), meta cache, and
140   * connections to region servers and masters. <br>
141   * The caller is responsible for calling {@link Connection#close()} on the returned connection
142   * instance. Typical usage:
143   *
144   * <pre>
145   * try (Connection connection = ConnectionFactory.createConnection(conf);
146   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
147   *   table.get(...);
148   *   ...
149   * }
150   * </pre>
151   *
152   * @param conf configuration
153   * @return Connection object for <code>conf</code>
154   */
155  public static Connection createConnection(Configuration conf) throws IOException {
156    return createConnection(conf, null, AuthUtil.loginClient(conf));
157  }
158
159  /**
160   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
161   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
162   * created from returned connection share zookeeper connection(if used), meta cache, and
163   * connections to region servers and masters. <br>
164   * The caller is responsible for calling {@link Connection#close()} on the returned connection
165   * instance. Typical usage:
166   *
167   * <pre>
168   * try (Connection connection = ConnectionFactory.createConnection(conf);
169   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
170   *   table.get(...);
171   *   ...
172   * }
173   * </pre>
174   *
175   * @param connectionUri the connection uri for the hbase cluster
176   * @param conf          configuration
177   * @return Connection object for <code>conf</code>
178   */
179  public static Connection createConnection(URI connectionUri, Configuration conf)
180    throws IOException {
181    return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf));
182  }
183
184  /**
185   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
186   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
187   * created from returned connection share zookeeper connection(if used), meta cache, and
188   * connections to region servers and masters. <br>
189   * The caller is responsible for calling {@link Connection#close()} on the returned connection
190   * instance. Typical usage:
191   *
192   * <pre>
193   * try (Connection connection = ConnectionFactory.createConnection(conf);
194   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
195   *   table.get(...);
196   *   ...
197   * }
198   * </pre>
199   *
200   * @param conf configuration
201   * @param pool the thread pool to use for batch operations
202   * @return Connection object for <code>conf</code>
203   */
204  public static Connection createConnection(Configuration conf, ExecutorService pool)
205    throws IOException {
206    return createConnection(conf, pool, AuthUtil.loginClient(conf));
207  }
208
209  /**
210   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
211   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
212   * created from returned connection share zookeeper connection(if used), meta cache, and
213   * connections to region servers and masters. <br>
214   * The caller is responsible for calling {@link Connection#close()} on the returned connection
215   * instance. Typical usage:
216   *
217   * <pre>
218   * try (Connection connection = ConnectionFactory.createConnection(conf);
219   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
220   *   table.get(...);
221   *   ...
222   * }
223   * </pre>
224   *
225   * @param connectionUri the connection uri for the hbase cluster
226   * @param conf          configuration
227   * @param pool          the thread pool to use for batch operations
228   * @return Connection object for <code>conf</code>
229   */
230  public static Connection createConnection(URI connectionUri, Configuration conf,
231    ExecutorService pool) throws IOException {
232    return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf));
233  }
234
235  /**
236   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
237   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
238   * created from returned connection share zookeeper connection(if used), meta cache, and
239   * connections to region servers and masters. <br>
240   * The caller is responsible for calling {@link Connection#close()} on the returned connection
241   * instance. Typical usage:
242   *
243   * <pre>
244   * try (Connection connection = ConnectionFactory.createConnection(conf);
245   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
246   *   table.get(...);
247   *   ...
248   * }
249   * </pre>
250   *
251   * @param conf configuration
252   * @param user the user the connection is for
253   * @return Connection object for <code>conf</code>
254   */
255  public static Connection createConnection(Configuration conf, User user) throws IOException {
256    return createConnection(conf, null, user);
257  }
258
259  /**
260   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
261   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
262   * created from returned connection share zookeeper connection(if used), meta cache, and
263   * connections to region servers and masters. <br>
264   * The caller is responsible for calling {@link Connection#close()} on the returned connection
265   * instance. Typical usage:
266   *
267   * <pre>
268   * try (Connection connection = ConnectionFactory.createConnection(conf);
269   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
270   *   table.get(...);
271   *   ...
272   * }
273   * </pre>
274   *
275   * @param connectionUri the connection uri for the hbase cluster
276   * @param conf          configuration
277   * @param user          the user the connection is for
278   * @return Connection object for <code>conf</code>
279   */
280  public static Connection createConnection(URI connectionUri, Configuration conf, User user)
281    throws IOException {
282    return createConnection(connectionUri, conf, null, user);
283  }
284
285  /**
286   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
287   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
288   * created from returned connection share zookeeper connection(if used), meta cache, and
289   * connections to region servers and masters. <br>
290   * The caller is responsible for calling {@link Connection#close()} on the returned connection
291   * instance. Typical usage:
292   *
293   * <pre>
294   * try (Connection connection = ConnectionFactory.createConnection(conf);
295   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
296   *   table.get(...);
297   *   ...
298   * }
299   * </pre>
300   *
301   * @param conf configuration
302   * @param user the user the connection is for
303   * @param pool the thread pool to use for batch operations
304   * @return Connection object for <code>conf</code>
305   */
306  public static Connection createConnection(Configuration conf, ExecutorService pool,
307    final User user) throws IOException {
308    return createConnection(conf, pool, user, Collections.emptyMap());
309  }
310
311  /**
312   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
313   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
314   * created from returned connection share zookeeper connection(if used), meta cache, and
315   * connections to region servers and masters. <br>
316   * The caller is responsible for calling {@link Connection#close()} on the returned connection
317   * instance. Typical usage:
318   *
319   * <pre>
320   * try (Connection connection = ConnectionFactory.createConnection(conf);
321   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
322   *   table.get(...);
323   *   ...
324   * }
325   * </pre>
326   *
327   * @param connectionUri the connection uri for the hbase cluster
328   * @param conf          configuration
329   * @param user          the user the connection is for
330   * @param pool          the thread pool to use for batch operations
331   * @return Connection object for <code>conf</code>
332   */
333  public static Connection createConnection(URI connectionUri, Configuration conf,
334    ExecutorService pool, User user) throws IOException {
335    return createConnection(connectionUri, conf, pool, user, Collections.emptyMap());
336  }
337
338  /**
339   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
340   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
341   * created from returned connection share zookeeper connection(if used), meta cache, and
342   * connections to region servers and masters. <br>
343   * The caller is responsible for calling {@link Connection#close()} on the returned connection
344   * instance. Typical usage:
345   *
346   * <pre>
347   * try (Connection connection = ConnectionFactory.createConnection(conf);
348   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
349   *   table.get(...);
350   *   ...
351   * }
352   * </pre>
353   *
354   * @param conf                 configuration
355   * @param user                 the user the connection is for
356   * @param pool                 the thread pool to use for batch operations
357   * @param connectionAttributes attributes to be sent along to server during connection establish
358   * @return Connection object for <code>conf</code>
359   */
360  public static Connection createConnection(Configuration conf, ExecutorService pool,
361    final User user, Map<String, byte[]> connectionAttributes) throws IOException {
362    return createConnection(null, conf, pool, user, connectionAttributes);
363  }
364
365  /**
366   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
367   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
368   * created from returned connection share zookeeper connection(if used), meta cache, and
369   * connections to region servers and masters. <br>
370   * The caller is responsible for calling {@link Connection#close()} on the returned connection
371   * instance. Typical usage:
372   *
373   * <pre>
374   * Connection connection = ConnectionFactory.createConnection(conf);
375   * Table table = connection.getTable(TableName.valueOf("table1"));
376   * try (Connection connection = ConnectionFactory.createConnection(conf);
377   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
378   *   table.get(...);
379   *   ...
380   * }
381   * </pre>
382   *
383   * @param connectionUri        the connection uri for the hbase cluster
384   * @param conf                 configuration
385   * @param user                 the user the connection is for
386   * @param pool                 the thread pool to use for batch operations
387   * @param connectionAttributes attributes to be sent along to server during connection establish
388   * @return Connection object for <code>conf</code>
389   */
390  public static Connection createConnection(URI connectionUri, Configuration conf,
391    ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes)
392    throws IOException {
393    Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
394      ConnectionOverAsyncConnection.class, Connection.class);
395    if (clazz != ConnectionOverAsyncConnection.class) {
396      return TraceUtil.trace(() -> {
397        try {
398          // Default HCM#HCI is not accessible; make it so before invoking.
399          Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
400            ExecutorService.class, User.class, ConnectionRegistry.class, Map.class);
401          constructor.setAccessible(true);
402          ConnectionRegistry registry = connectionUri != null
403            ? ConnectionRegistryFactory.create(connectionUri, conf, user)
404            : ConnectionRegistryFactory.create(conf, user);
405          return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
406            .newInstance(conf, pool, user, registry, connectionAttributes));
407        } catch (NoSuchMethodException e) {
408          LOG.debug("Constructor with connection registry not found for class {},"
409            + " fallback to use old constructor", clazz.getName(), e);
410        } catch (Exception e) {
411          Throwables.throwIfInstanceOf(e, IOException.class);
412          Throwables.throwIfUnchecked(e);
413          throw new IOException(e);
414        }
415
416        try {
417          // Default HCM#HCI is not accessible; make it so before invoking.
418          Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
419            ExecutorService.class, User.class, Map.class);
420          constructor.setAccessible(true);
421          return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
422            .newInstance(conf, pool, user, connectionAttributes));
423        } catch (Exception e) {
424          Throwables.throwIfInstanceOf(e, IOException.class);
425          Throwables.throwIfUnchecked(e);
426          throw new IOException(e);
427        }
428      }, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection"));
429    } else {
430      return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes))
431        .toConnection();
432    }
433  }
434
435  /**
436   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
437   * @see #createAsyncConnection(Configuration)
438   * @return AsyncConnection object wrapped by CompletableFuture
439   */
440  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
441    return createAsyncConnection(HBaseConfiguration.create());
442  }
443
444  /**
445   * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration.
446   * @param connectionUri the connection uri for the hbase cluster
447   * @see #createAsyncConnection(URI, Configuration)
448   * @return AsyncConnection object wrapped by CompletableFuture
449   */
450  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) {
451    return createAsyncConnection(connectionUri, HBaseConfiguration.create());
452  }
453
454  /**
455   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
456   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
457   * initialize the {@link UserProvider}.
458   * @param conf configuration
459   * @return AsyncConnection object wrapped by CompletableFuture
460   * @see #createAsyncConnection(Configuration, User)
461   * @see UserProvider
462   */
463  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
464    return createAsyncConnection(null, conf);
465  }
466
467  /**
468   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri},
469   * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will
470   * also be used to initialize the {@link UserProvider}.
471   * @param connectionUri the connection uri for the hbase cluster
472   * @param conf          configuration
473   * @return AsyncConnection object wrapped by CompletableFuture
474   * @see #createAsyncConnection(Configuration, User)
475   * @see UserProvider
476   */
477  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
478    Configuration conf) {
479    User user;
480    try {
481      user = AuthUtil.loginClient(conf);
482    } catch (IOException e) {
483      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
484      future.completeExceptionally(e);
485      return future;
486    }
487    return createAsyncConnection(connectionUri, conf, user);
488  }
489
490  /**
491   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
492   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
493   * interfaces created from returned connection share zookeeper connection, meta cache, and
494   * connections to region servers and masters.
495   * <p>
496   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
497   * connection instance.
498   * <p>
499   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
500   * as it is thread safe.
501   * @param conf configuration
502   * @param user the user the asynchronous connection is for
503   * @return AsyncConnection object wrapped by CompletableFuture
504   */
505  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
506    final User user) {
507    return createAsyncConnection(null, conf, user);
508  }
509
510  /**
511   * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
512   * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
513   * All tables and interfaces created from returned connection share zookeeper connection(if used),
514   * meta cache, and connections to region servers and masters.
515   * <p>
516   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
517   * connection instance.
518   * <p>
519   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
520   * as it is thread safe.
521   * @param connectionUri the connection uri for the hbase cluster
522   * @param conf          configuration
523   * @param user          the user the asynchronous connection is for
524   * @return AsyncConnection object wrapped by CompletableFuture
525   */
526  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
527    Configuration conf, final User user) {
528    return createAsyncConnection(connectionUri, conf, user, null);
529  }
530
531  /**
532   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
533   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
534   * interfaces created from returned connection share zookeeper connection, meta cache, and
535   * connections to region servers and masters.
536   * <p>
537   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
538   * connection instance.
539   * <p>
540   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
541   * as it is thread safe.
542   * @param conf                 configuration
543   * @param user                 the user the asynchronous connection is for
544   * @param connectionAttributes attributes to be sent along to server during connection establish
545   * @return AsyncConnection object wrapped by CompletableFuture
546   */
547  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
548    final User user, Map<String, byte[]> connectionAttributes) {
549    return createAsyncConnection(null, conf, user, connectionAttributes);
550  }
551
552  /**
553   * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
554   * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
555   * All tables and interfaces created from returned connection share zookeeper connection(if used),
556   * meta cache, and connections to region servers and masters.
557   * <p>
558   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
559   * connection instance.
560   * <p>
561   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
562   * as it is thread safe.
563   * @param connectionUri        the connection uri for the hbase cluster
564   * @param conf                 configuration
565   * @param user                 the user the asynchronous connection is for
566   * @param connectionAttributes attributes to be sent along to server during connection establish
567   * @return AsyncConnection object wrapped by CompletableFuture
568   */
569  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
570    Configuration conf, final User user, Map<String, byte[]> connectionAttributes) {
571    return TraceUtil.tracedFuture(() -> {
572      ConnectionRegistry registry;
573      Configuration appliedConf;
574      try {
575        if (connectionUri != null) {
576          appliedConf = new Configuration(conf);
577          Strings.applyURIQueriesToConf(connectionUri, appliedConf);
578          registry = ConnectionRegistryFactory.create(connectionUri, appliedConf, user);
579        } else {
580          appliedConf = conf;
581          registry = ConnectionRegistryFactory.create(appliedConf, user);
582        }
583      } catch (Exception e) {
584        return FutureUtils.failedFuture(e);
585      }
586      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
587      addListener(registry.getClusterId(), (clusterId, error) -> {
588        if (error != null) {
589          registry.close();
590          future.completeExceptionally(error);
591          return;
592        }
593        if (clusterId == null) {
594          registry.close();
595          future.completeExceptionally(new IOException("clusterid came back null"));
596          return;
597        }
598        Class<? extends AsyncConnection> clazz = appliedConf.getClass(
599          HBASE_CLIENT_ASYNC_CONNECTION_IMPL, AsyncConnectionImpl.class, AsyncConnection.class);
600        try {
601          future.complete(user.runAs((PrivilegedExceptionAction<
602            ? extends AsyncConnection>) () -> ReflectionUtils.newInstance(clazz, appliedConf,
603              registry, clusterId, null, user, connectionAttributes)));
604        } catch (Exception e) {
605          registry.close();
606          future.completeExceptionally(e);
607        }
608      });
609      return future;
610    }, "ConnectionFactory.createAsyncConnection");
611  }
612}