001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.net.URI;
025import java.security.PrivilegedExceptionAction;
026import java.util.Collections;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutorService;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.AuthUtil;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.security.User;
034import org.apache.hadoop.hbase.security.UserProvider;
035import org.apache.hadoop.hbase.trace.TraceUtil;
036import org.apache.hadoop.hbase.util.FutureUtils;
037import org.apache.hadoop.hbase.util.ReflectionUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
043
044/**
045 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
046 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
047 * {@link Connection}, {@link Table} implementations are retrieved with
048 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
049 *
050 * <pre>
051 * Connection connection = ConnectionFactory.createConnection(config);
052 * Table table = connection.getTable(TableName.valueOf("table1"));
053 * try {
054 *   // Use the table as needed, for a single operation and a single thread
055 * } finally {
056 *   table.close();
057 *   connection.close();
058 * }
059 * </pre>
060 *
061 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
062 * credentials if caller has following two configurations set:
063 * <ul>
064 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
065 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
066 * </ul>
067 * By this way, caller can directly connect to kerberized cluster without caring login and
068 * credentials renewal logic in application.
069 *
070 * <pre>
071 * </pre>
072 *
073 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
074 * implementations.
075 * @see Connection
076 * @since 0.99.0
077 */
078@InterfaceAudience.Public
079public class ConnectionFactory {
080
081  private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactory.class);
082
083  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
084    "hbase.client.async.connection.impl";
085
086  /** No public c.tors */
087  protected ConnectionFactory() {
088  }
089
090  /**
091   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
092   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
093   * connection share zookeeper connection, meta cache, and connections to region servers and
094   * masters. <br>
095   * The caller is responsible for calling {@link Connection#close()} on the returned connection
096   * instance. Typical usage:
097   *
098   * <pre>
099   * try (Connection connection = ConnectionFactory.createConnection(conf);
100   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
101   *   table.get(...);
102   *   ...
103   * }
104   * </pre>
105   *
106   * @return Connection object for <code>conf</code>
107   */
108  public static Connection createConnection() throws IOException {
109    return createConnection(HBaseConfiguration.create());
110  }
111
112  /**
113   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
114   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
115   * connection share zookeeper connection, meta cache, and connections to region servers and
116   * masters. <br>
117   * The caller is responsible for calling {@link Connection#close()} on the returned connection
118   * instance. Typical usage:
119   *
120   * <pre>
121   * try (Connection connection = ConnectionFactory.createConnection(conf);
122   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
123   *   table.get(...);
124   *   ...
125   * }
126   * </pre>
127   *
128   * @param connectionUri the connection uri for the hbase cluster
129   * @return Connection object for <code>conf</code>
130   */
131  public static Connection createConnection(URI connectionUri) throws IOException {
132    return createConnection(connectionUri, HBaseConfiguration.create());
133  }
134
135  /**
136   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
137   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
138   * created from returned connection share zookeeper connection(if used), meta cache, and
139   * connections to region servers and masters. <br>
140   * The caller is responsible for calling {@link Connection#close()} on the returned connection
141   * instance. Typical usage:
142   *
143   * <pre>
144   * try (Connection connection = ConnectionFactory.createConnection(conf);
145   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
146   *   table.get(...);
147   *   ...
148   * }
149   * </pre>
150   *
151   * @param conf configuration
152   * @return Connection object for <code>conf</code>
153   */
154  public static Connection createConnection(Configuration conf) throws IOException {
155    return createConnection(conf, null, AuthUtil.loginClient(conf));
156  }
157
158  /**
159   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
160   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
161   * created from returned connection share zookeeper connection(if used), meta cache, and
162   * connections to region servers and masters. <br>
163   * The caller is responsible for calling {@link Connection#close()} on the returned connection
164   * instance. Typical usage:
165   *
166   * <pre>
167   * try (Connection connection = ConnectionFactory.createConnection(conf);
168   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
169   *   table.get(...);
170   *   ...
171   * }
172   * </pre>
173   *
174   * @param connectionUri the connection uri for the hbase cluster
175   * @param conf          configuration
176   * @return Connection object for <code>conf</code>
177   */
178  public static Connection createConnection(URI connectionUri, Configuration conf)
179    throws IOException {
180    return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf));
181  }
182
183  /**
184   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
185   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
186   * created from returned connection share zookeeper connection(if used), meta cache, and
187   * connections to region servers and masters. <br>
188   * The caller is responsible for calling {@link Connection#close()} on the returned connection
189   * instance. Typical usage:
190   *
191   * <pre>
192   * try (Connection connection = ConnectionFactory.createConnection(conf);
193   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
194   *   table.get(...);
195   *   ...
196   * }
197   * </pre>
198   *
199   * @param conf configuration
200   * @param pool the thread pool to use for batch operations
201   * @return Connection object for <code>conf</code>
202   */
203  public static Connection createConnection(Configuration conf, ExecutorService pool)
204    throws IOException {
205    return createConnection(conf, pool, AuthUtil.loginClient(conf));
206  }
207
208  /**
209   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
210   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
211   * created from returned connection share zookeeper connection(if used), meta cache, and
212   * connections to region servers and masters. <br>
213   * The caller is responsible for calling {@link Connection#close()} on the returned connection
214   * instance. Typical usage:
215   *
216   * <pre>
217   * try (Connection connection = ConnectionFactory.createConnection(conf);
218   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
219   *   table.get(...);
220   *   ...
221   * }
222   * </pre>
223   *
224   * @param connectionUri the connection uri for the hbase cluster
225   * @param conf          configuration
226   * @param pool          the thread pool to use for batch operations
227   * @return Connection object for <code>conf</code>
228   */
229  public static Connection createConnection(URI connectionUri, Configuration conf,
230    ExecutorService pool) throws IOException {
231    return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf));
232  }
233
234  /**
235   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
236   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
237   * created from returned connection share zookeeper connection(if used), meta cache, and
238   * connections to region servers and masters. <br>
239   * The caller is responsible for calling {@link Connection#close()} on the returned connection
240   * instance. Typical usage:
241   *
242   * <pre>
243   * try (Connection connection = ConnectionFactory.createConnection(conf);
244   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
245   *   table.get(...);
246   *   ...
247   * }
248   * </pre>
249   *
250   * @param conf configuration
251   * @param user the user the connection is for
252   * @return Connection object for <code>conf</code>
253   */
254  public static Connection createConnection(Configuration conf, User user) throws IOException {
255    return createConnection(conf, null, user);
256  }
257
258  /**
259   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
260   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
261   * created from returned connection share zookeeper connection(if used), meta cache, and
262   * connections to region servers and masters. <br>
263   * The caller is responsible for calling {@link Connection#close()} on the returned connection
264   * instance. Typical usage:
265   *
266   * <pre>
267   * try (Connection connection = ConnectionFactory.createConnection(conf);
268   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
269   *   table.get(...);
270   *   ...
271   * }
272   * </pre>
273   *
274   * @param connectionUri the connection uri for the hbase cluster
275   * @param conf          configuration
276   * @param user          the user the connection is for
277   * @return Connection object for <code>conf</code>
278   */
279  public static Connection createConnection(URI connectionUri, Configuration conf, User user)
280    throws IOException {
281    return createConnection(connectionUri, conf, null, user);
282  }
283
284  /**
285   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
286   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
287   * created from returned connection share zookeeper connection(if used), meta cache, and
288   * connections to region servers and masters. <br>
289   * The caller is responsible for calling {@link Connection#close()} on the returned connection
290   * instance. Typical usage:
291   *
292   * <pre>
293   * try (Connection connection = ConnectionFactory.createConnection(conf);
294   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
295   *   table.get(...);
296   *   ...
297   * }
298   * </pre>
299   *
300   * @param conf configuration
301   * @param user the user the connection is for
302   * @param pool the thread pool to use for batch operations
303   * @return Connection object for <code>conf</code>
304   */
305  public static Connection createConnection(Configuration conf, ExecutorService pool,
306    final User user) throws IOException {
307    return createConnection(conf, pool, user, Collections.emptyMap());
308  }
309
310  /**
311   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
312   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
313   * created from returned connection share zookeeper connection(if used), meta cache, and
314   * connections to region servers and masters. <br>
315   * The caller is responsible for calling {@link Connection#close()} on the returned connection
316   * instance. Typical usage:
317   *
318   * <pre>
319   * try (Connection connection = ConnectionFactory.createConnection(conf);
320   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
321   *   table.get(...);
322   *   ...
323   * }
324   * </pre>
325   *
326   * @param connectionUri the connection uri for the hbase cluster
327   * @param conf          configuration
328   * @param user          the user the connection is for
329   * @param pool          the thread pool to use for batch operations
330   * @return Connection object for <code>conf</code>
331   */
332  public static Connection createConnection(URI connectionUri, Configuration conf,
333    ExecutorService pool, User user) throws IOException {
334    return createConnection(connectionUri, conf, pool, user, Collections.emptyMap());
335  }
336
337  /**
338   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
339   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
340   * created from returned connection share zookeeper connection(if used), meta cache, and
341   * connections to region servers and masters. <br>
342   * The caller is responsible for calling {@link Connection#close()} on the returned connection
343   * instance. Typical usage:
344   *
345   * <pre>
346   * try (Connection connection = ConnectionFactory.createConnection(conf);
347   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
348   *   table.get(...);
349   *   ...
350   * }
351   * </pre>
352   *
353   * @param conf                 configuration
354   * @param user                 the user the connection is for
355   * @param pool                 the thread pool to use for batch operations
356   * @param connectionAttributes attributes to be sent along to server during connection establish
357   * @return Connection object for <code>conf</code>
358   */
359  public static Connection createConnection(Configuration conf, ExecutorService pool,
360    final User user, Map<String, byte[]> connectionAttributes) throws IOException {
361    return createConnection(null, conf, pool, user, connectionAttributes);
362  }
363
364  /**
365   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
366   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
367   * created from returned connection share zookeeper connection(if used), meta cache, and
368   * connections to region servers and masters. <br>
369   * The caller is responsible for calling {@link Connection#close()} on the returned connection
370   * instance. Typical usage:
371   *
372   * <pre>
373   * Connection connection = ConnectionFactory.createConnection(conf);
374   * Table table = connection.getTable(TableName.valueOf("table1"));
375   * try (Connection connection = ConnectionFactory.createConnection(conf);
376   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
377   *   table.get(...);
378   *   ...
379   * }
380   * </pre>
381   *
382   * @param connectionUri        the connection uri for the hbase cluster
383   * @param conf                 configuration
384   * @param user                 the user the connection is for
385   * @param pool                 the thread pool to use for batch operations
386   * @param connectionAttributes attributes to be sent along to server during connection establish
387   * @return Connection object for <code>conf</code>
388   */
389  public static Connection createConnection(URI connectionUri, Configuration conf,
390    ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes)
391    throws IOException {
392    Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
393      ConnectionOverAsyncConnection.class, Connection.class);
394    if (clazz != ConnectionOverAsyncConnection.class) {
395      return TraceUtil.trace(() -> {
396        try {
397          // Default HCM#HCI is not accessible; make it so before invoking.
398          Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
399            ExecutorService.class, User.class, ConnectionRegistry.class, Map.class);
400          constructor.setAccessible(true);
401          ConnectionRegistry registry = connectionUri != null
402            ? ConnectionRegistryFactory.create(connectionUri, conf, user)
403            : ConnectionRegistryFactory.create(conf, user);
404          return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
405            .newInstance(conf, pool, user, registry, connectionAttributes));
406        } catch (NoSuchMethodException e) {
407          LOG.debug("Constructor with connection registry not found for class {},"
408            + " fallback to use old constructor", clazz.getName(), e);
409        } catch (Exception e) {
410          Throwables.throwIfInstanceOf(e, IOException.class);
411          Throwables.throwIfUnchecked(e);
412          throw new IOException(e);
413        }
414
415        try {
416          // Default HCM#HCI is not accessible; make it so before invoking.
417          Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
418            ExecutorService.class, User.class, Map.class);
419          constructor.setAccessible(true);
420          return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
421            .newInstance(conf, pool, user, connectionAttributes));
422        } catch (Exception e) {
423          Throwables.throwIfInstanceOf(e, IOException.class);
424          Throwables.throwIfUnchecked(e);
425          throw new IOException(e);
426        }
427      }, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection"));
428    } else {
429      return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes))
430        .toConnection();
431    }
432  }
433
434  /**
435   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
436   * @see #createAsyncConnection(Configuration)
437   * @return AsyncConnection object wrapped by CompletableFuture
438   */
439  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
440    return createAsyncConnection(HBaseConfiguration.create());
441  }
442
443  /**
444   * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration.
445   * @param connectionUri the connection uri for the hbase cluster
446   * @see #createAsyncConnection(URI, Configuration)
447   * @return AsyncConnection object wrapped by CompletableFuture
448   */
449  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) {
450    return createAsyncConnection(connectionUri, HBaseConfiguration.create());
451  }
452
453  /**
454   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
455   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
456   * initialize the {@link UserProvider}.
457   * @param conf configuration
458   * @return AsyncConnection object wrapped by CompletableFuture
459   * @see #createAsyncConnection(Configuration, User)
460   * @see UserProvider
461   */
462  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
463    return createAsyncConnection(null, conf);
464  }
465
466  /**
467   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri},
468   * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will
469   * also be used to initialize the {@link UserProvider}.
470   * @param connectionUri the connection uri for the hbase cluster
471   * @param conf          configuration
472   * @return AsyncConnection object wrapped by CompletableFuture
473   * @see #createAsyncConnection(Configuration, User)
474   * @see UserProvider
475   */
476  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
477    Configuration conf) {
478    User user;
479    try {
480      user = AuthUtil.loginClient(conf);
481    } catch (IOException e) {
482      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
483      future.completeExceptionally(e);
484      return future;
485    }
486    return createAsyncConnection(connectionUri, conf, user);
487  }
488
489  /**
490   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
491   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
492   * interfaces created from returned connection share zookeeper connection, meta cache, and
493   * connections to region servers and masters.
494   * <p>
495   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
496   * connection instance.
497   * <p>
498   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
499   * as it is thread safe.
500   * @param conf configuration
501   * @param user the user the asynchronous connection is for
502   * @return AsyncConnection object wrapped by CompletableFuture
503   */
504  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
505    final User user) {
506    return createAsyncConnection(null, conf, user);
507  }
508
509  /**
510   * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
511   * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
512   * All tables and interfaces created from returned connection share zookeeper connection(if used),
513   * meta cache, and connections to region servers and masters.
514   * <p>
515   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
516   * connection instance.
517   * <p>
518   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
519   * as it is thread safe.
520   * @param connectionUri the connection uri for the hbase cluster
521   * @param conf          configuration
522   * @param user          the user the asynchronous connection is for
523   * @return AsyncConnection object wrapped by CompletableFuture
524   */
525  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
526    Configuration conf, final User user) {
527    return createAsyncConnection(connectionUri, conf, user, null);
528  }
529
530  /**
531   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
532   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
533   * interfaces created from returned connection share zookeeper connection, meta cache, and
534   * connections to region servers and masters.
535   * <p>
536   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
537   * connection instance.
538   * <p>
539   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
540   * as it is thread safe.
541   * @param conf                 configuration
542   * @param user                 the user the asynchronous connection is for
543   * @param connectionAttributes attributes to be sent along to server during connection establish
544   * @return AsyncConnection object wrapped by CompletableFuture
545   */
546  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
547    final User user, Map<String, byte[]> connectionAttributes) {
548    return createAsyncConnection(null, conf, user, connectionAttributes);
549  }
550
551  /**
552   * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
553   * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
554   * All tables and interfaces created from returned connection share zookeeper connection(if used),
555   * meta cache, and connections to region servers and masters.
556   * <p>
557   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
558   * connection instance.
559   * <p>
560   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
561   * as it is thread safe.
562   * @param connectionUri        the connection uri for the hbase cluster
563   * @param conf                 configuration
564   * @param user                 the user the asynchronous connection is for
565   * @param connectionAttributes attributes to be sent along to server during connection establish
566   * @return AsyncConnection object wrapped by CompletableFuture
567   */
568  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
569    Configuration conf, final User user, Map<String, byte[]> connectionAttributes) {
570    return TraceUtil.tracedFuture(() -> {
571      ConnectionRegistry registry;
572      try {
573        registry = connectionUri != null
574          ? ConnectionRegistryFactory.create(connectionUri, conf, user)
575          : ConnectionRegistryFactory.create(conf, user);
576      } catch (Exception e) {
577        return FutureUtils.failedFuture(e);
578      }
579      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
580      addListener(registry.getClusterId(), (clusterId, error) -> {
581        if (error != null) {
582          registry.close();
583          future.completeExceptionally(error);
584          return;
585        }
586        if (clusterId == null) {
587          registry.close();
588          future.completeExceptionally(new IOException("clusterid came back null"));
589          return;
590        }
591        Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
592          AsyncConnectionImpl.class, AsyncConnection.class);
593        try {
594          future.complete(
595            user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
596              .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes)));
597        } catch (Exception e) {
598          registry.close();
599          future.completeExceptionally(e);
600        }
601      });
602      return future;
603    }, "ConnectionFactory.createAsyncConnection");
604  }
605}