001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.lang.reflect.Constructor; 024import java.net.URI; 025import java.security.PrivilegedExceptionAction; 026import java.util.Collections; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutorService; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.AuthUtil; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.security.User; 034import org.apache.hadoop.hbase.security.UserProvider; 035import org.apache.hadoop.hbase.trace.TraceUtil; 036import org.apache.hadoop.hbase.util.FutureUtils; 037import org.apache.hadoop.hbase.util.ReflectionUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 043 044/** 045 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of 046 * the {@link Connection}s to the cluster is the responsibility of the caller. From a 047 * {@link Connection}, {@link Table} implementations are retrieved with 048 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: 049 * 050 * <pre> 051 * Connection connection = ConnectionFactory.createConnection(config); 052 * Table table = connection.getTable(TableName.valueOf("table1")); 053 * try { 054 * // Use the table as needed, for a single operation and a single thread 055 * } finally { 056 * table.close(); 057 * connection.close(); 058 * } 059 * </pre> 060 * 061 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos 062 * credentials if caller has following two configurations set: 063 * <ul> 064 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem 065 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use 066 * </ul> 067 * By this way, caller can directly connect to kerberized cluster without caring login and 068 * credentials renewal logic in application. 069 * 070 * <pre> 071 * </pre> 072 * 073 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator} 074 * implementations. 075 * @see Connection 076 * @since 0.99.0 077 */ 078@InterfaceAudience.Public 079public class ConnectionFactory { 080 081 private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactory.class); 082 083 public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = 084 "hbase.client.async.connection.impl"; 085 086 /** No public c.tors */ 087 protected ConnectionFactory() { 088 } 089 090 /** 091 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 092 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 093 * connection share zookeeper connection, meta cache, and connections to region servers and 094 * masters. <br> 095 * The caller is responsible for calling {@link Connection#close()} on the returned connection 096 * instance. Typical usage: 097 * 098 * <pre> 099 * try (Connection connection = ConnectionFactory.createConnection(conf); 100 * Table table = connection.getTable(TableName.valueOf("table1"))) { 101 * table.get(...); 102 * ... 103 * } 104 * </pre> 105 * 106 * @return Connection object for <code>conf</code> 107 */ 108 public static Connection createConnection() throws IOException { 109 return createConnection(HBaseConfiguration.create()); 110 } 111 112 /** 113 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 114 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 115 * connection share zookeeper connection, meta cache, and connections to region servers and 116 * masters. <br> 117 * The caller is responsible for calling {@link Connection#close()} on the returned connection 118 * instance. Typical usage: 119 * 120 * <pre> 121 * try (Connection connection = ConnectionFactory.createConnection(conf); 122 * Table table = connection.getTable(TableName.valueOf("table1"))) { 123 * table.get(...); 124 * ... 125 * } 126 * </pre> 127 * 128 * @param connectionUri the connection uri for the hbase cluster 129 * @return Connection object for <code>conf</code> 130 */ 131 public static Connection createConnection(URI connectionUri) throws IOException { 132 return createConnection(connectionUri, HBaseConfiguration.create()); 133 } 134 135 /** 136 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 137 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 138 * created from returned connection share zookeeper connection(if used), meta cache, and 139 * connections to region servers and masters. <br> 140 * The caller is responsible for calling {@link Connection#close()} on the returned connection 141 * instance. Typical usage: 142 * 143 * <pre> 144 * try (Connection connection = ConnectionFactory.createConnection(conf); 145 * Table table = connection.getTable(TableName.valueOf("table1"))) { 146 * table.get(...); 147 * ... 148 * } 149 * </pre> 150 * 151 * @param conf configuration 152 * @return Connection object for <code>conf</code> 153 */ 154 public static Connection createConnection(Configuration conf) throws IOException { 155 return createConnection(conf, null, AuthUtil.loginClient(conf)); 156 } 157 158 /** 159 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 160 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 161 * created from returned connection share zookeeper connection(if used), meta cache, and 162 * connections to region servers and masters. <br> 163 * The caller is responsible for calling {@link Connection#close()} on the returned connection 164 * instance. Typical usage: 165 * 166 * <pre> 167 * try (Connection connection = ConnectionFactory.createConnection(conf); 168 * Table table = connection.getTable(TableName.valueOf("table1"))) { 169 * table.get(...); 170 * ... 171 * } 172 * </pre> 173 * 174 * @param connectionUri the connection uri for the hbase cluster 175 * @param conf configuration 176 * @return Connection object for <code>conf</code> 177 */ 178 public static Connection createConnection(URI connectionUri, Configuration conf) 179 throws IOException { 180 return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf)); 181 } 182 183 /** 184 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 185 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 186 * created from returned connection share zookeeper connection(if used), meta cache, and 187 * connections to region servers and masters. <br> 188 * The caller is responsible for calling {@link Connection#close()} on the returned connection 189 * instance. Typical usage: 190 * 191 * <pre> 192 * try (Connection connection = ConnectionFactory.createConnection(conf); 193 * Table table = connection.getTable(TableName.valueOf("table1"))) { 194 * table.get(...); 195 * ... 196 * } 197 * </pre> 198 * 199 * @param conf configuration 200 * @param pool the thread pool to use for batch operations 201 * @return Connection object for <code>conf</code> 202 */ 203 public static Connection createConnection(Configuration conf, ExecutorService pool) 204 throws IOException { 205 return createConnection(conf, pool, AuthUtil.loginClient(conf)); 206 } 207 208 /** 209 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 210 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 211 * created from returned connection share zookeeper connection(if used), meta cache, and 212 * connections to region servers and masters. <br> 213 * The caller is responsible for calling {@link Connection#close()} on the returned connection 214 * instance. Typical usage: 215 * 216 * <pre> 217 * try (Connection connection = ConnectionFactory.createConnection(conf); 218 * Table table = connection.getTable(TableName.valueOf("table1"))) { 219 * table.get(...); 220 * ... 221 * } 222 * </pre> 223 * 224 * @param connectionUri the connection uri for the hbase cluster 225 * @param conf configuration 226 * @param pool the thread pool to use for batch operations 227 * @return Connection object for <code>conf</code> 228 */ 229 public static Connection createConnection(URI connectionUri, Configuration conf, 230 ExecutorService pool) throws IOException { 231 return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf)); 232 } 233 234 /** 235 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 236 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 237 * created from returned connection share zookeeper connection(if used), meta cache, and 238 * connections to region servers and masters. <br> 239 * The caller is responsible for calling {@link Connection#close()} on the returned connection 240 * instance. Typical usage: 241 * 242 * <pre> 243 * try (Connection connection = ConnectionFactory.createConnection(conf); 244 * Table table = connection.getTable(TableName.valueOf("table1"))) { 245 * table.get(...); 246 * ... 247 * } 248 * </pre> 249 * 250 * @param conf configuration 251 * @param user the user the connection is for 252 * @return Connection object for <code>conf</code> 253 */ 254 public static Connection createConnection(Configuration conf, User user) throws IOException { 255 return createConnection(conf, null, user); 256 } 257 258 /** 259 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 260 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 261 * created from returned connection share zookeeper connection(if used), meta cache, and 262 * connections to region servers and masters. <br> 263 * The caller is responsible for calling {@link Connection#close()} on the returned connection 264 * instance. Typical usage: 265 * 266 * <pre> 267 * try (Connection connection = ConnectionFactory.createConnection(conf); 268 * Table table = connection.getTable(TableName.valueOf("table1"))) { 269 * table.get(...); 270 * ... 271 * } 272 * </pre> 273 * 274 * @param connectionUri the connection uri for the hbase cluster 275 * @param conf configuration 276 * @param user the user the connection is for 277 * @return Connection object for <code>conf</code> 278 */ 279 public static Connection createConnection(URI connectionUri, Configuration conf, User user) 280 throws IOException { 281 return createConnection(connectionUri, conf, null, user); 282 } 283 284 /** 285 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 286 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 287 * created from returned connection share zookeeper connection(if used), meta cache, and 288 * connections to region servers and masters. <br> 289 * The caller is responsible for calling {@link Connection#close()} on the returned connection 290 * instance. Typical usage: 291 * 292 * <pre> 293 * try (Connection connection = ConnectionFactory.createConnection(conf); 294 * Table table = connection.getTable(TableName.valueOf("table1"))) { 295 * table.get(...); 296 * ... 297 * } 298 * </pre> 299 * 300 * @param conf configuration 301 * @param user the user the connection is for 302 * @param pool the thread pool to use for batch operations 303 * @return Connection object for <code>conf</code> 304 */ 305 public static Connection createConnection(Configuration conf, ExecutorService pool, 306 final User user) throws IOException { 307 return createConnection(conf, pool, user, Collections.emptyMap()); 308 } 309 310 /** 311 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 312 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 313 * created from returned connection share zookeeper connection(if used), meta cache, and 314 * connections to region servers and masters. <br> 315 * The caller is responsible for calling {@link Connection#close()} on the returned connection 316 * instance. Typical usage: 317 * 318 * <pre> 319 * try (Connection connection = ConnectionFactory.createConnection(conf); 320 * Table table = connection.getTable(TableName.valueOf("table1"))) { 321 * table.get(...); 322 * ... 323 * } 324 * </pre> 325 * 326 * @param connectionUri the connection uri for the hbase cluster 327 * @param conf configuration 328 * @param user the user the connection is for 329 * @param pool the thread pool to use for batch operations 330 * @return Connection object for <code>conf</code> 331 */ 332 public static Connection createConnection(URI connectionUri, Configuration conf, 333 ExecutorService pool, User user) throws IOException { 334 return createConnection(connectionUri, conf, pool, user, Collections.emptyMap()); 335 } 336 337 /** 338 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 339 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 340 * created from returned connection share zookeeper connection(if used), meta cache, and 341 * connections to region servers and masters. <br> 342 * The caller is responsible for calling {@link Connection#close()} on the returned connection 343 * instance. Typical usage: 344 * 345 * <pre> 346 * try (Connection connection = ConnectionFactory.createConnection(conf); 347 * Table table = connection.getTable(TableName.valueOf("table1"))) { 348 * table.get(...); 349 * ... 350 * } 351 * </pre> 352 * 353 * @param conf configuration 354 * @param user the user the connection is for 355 * @param pool the thread pool to use for batch operations 356 * @param connectionAttributes attributes to be sent along to server during connection establish 357 * @return Connection object for <code>conf</code> 358 */ 359 public static Connection createConnection(Configuration conf, ExecutorService pool, 360 final User user, Map<String, byte[]> connectionAttributes) throws IOException { 361 return createConnection(null, conf, pool, user, connectionAttributes); 362 } 363 364 /** 365 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 366 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 367 * created from returned connection share zookeeper connection(if used), meta cache, and 368 * connections to region servers and masters. <br> 369 * The caller is responsible for calling {@link Connection#close()} on the returned connection 370 * instance. Typical usage: 371 * 372 * <pre> 373 * Connection connection = ConnectionFactory.createConnection(conf); 374 * Table table = connection.getTable(TableName.valueOf("table1")); 375 * try (Connection connection = ConnectionFactory.createConnection(conf); 376 * Table table = connection.getTable(TableName.valueOf("table1"))) { 377 * table.get(...); 378 * ... 379 * } 380 * </pre> 381 * 382 * @param connectionUri the connection uri for the hbase cluster 383 * @param conf configuration 384 * @param user the user the connection is for 385 * @param pool the thread pool to use for batch operations 386 * @param connectionAttributes attributes to be sent along to server during connection establish 387 * @return Connection object for <code>conf</code> 388 */ 389 public static Connection createConnection(URI connectionUri, Configuration conf, 390 ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes) 391 throws IOException { 392 Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 393 ConnectionOverAsyncConnection.class, Connection.class); 394 if (clazz != ConnectionOverAsyncConnection.class) { 395 return TraceUtil.trace(() -> { 396 try { 397 // Default HCM#HCI is not accessible; make it so before invoking. 398 Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, 399 ExecutorService.class, User.class, ConnectionRegistry.class, Map.class); 400 constructor.setAccessible(true); 401 ConnectionRegistry registry = connectionUri != null 402 ? ConnectionRegistryFactory.create(connectionUri, conf, user) 403 : ConnectionRegistryFactory.create(conf, user); 404 return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor 405 .newInstance(conf, pool, user, registry, connectionAttributes)); 406 } catch (NoSuchMethodException e) { 407 LOG.debug("Constructor with connection registry not found for class {}," 408 + " fallback to use old constructor", clazz.getName(), e); 409 } catch (Exception e) { 410 Throwables.throwIfInstanceOf(e, IOException.class); 411 Throwables.throwIfUnchecked(e); 412 throw new IOException(e); 413 } 414 415 try { 416 // Default HCM#HCI is not accessible; make it so before invoking. 417 Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, 418 ExecutorService.class, User.class, Map.class); 419 constructor.setAccessible(true); 420 return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor 421 .newInstance(conf, pool, user, connectionAttributes)); 422 } catch (Exception e) { 423 Throwables.throwIfInstanceOf(e, IOException.class); 424 Throwables.throwIfUnchecked(e); 425 throw new IOException(e); 426 } 427 }, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection")); 428 } else { 429 return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes)) 430 .toConnection(); 431 } 432 } 433 434 /** 435 * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration. 436 * @see #createAsyncConnection(Configuration) 437 * @return AsyncConnection object wrapped by CompletableFuture 438 */ 439 public static CompletableFuture<AsyncConnection> createAsyncConnection() { 440 return createAsyncConnection(HBaseConfiguration.create()); 441 } 442 443 /** 444 * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration. 445 * @param connectionUri the connection uri for the hbase cluster 446 * @see #createAsyncConnection(URI, Configuration) 447 * @return AsyncConnection object wrapped by CompletableFuture 448 */ 449 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) { 450 return createAsyncConnection(connectionUri, HBaseConfiguration.create()); 451 } 452 453 /** 454 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a 455 * User object created by {@link UserProvider}. The given {@code conf} will also be used to 456 * initialize the {@link UserProvider}. 457 * @param conf configuration 458 * @return AsyncConnection object wrapped by CompletableFuture 459 * @see #createAsyncConnection(Configuration, User) 460 * @see UserProvider 461 */ 462 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { 463 return createAsyncConnection(null, conf); 464 } 465 466 /** 467 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri}, 468 * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will 469 * also be used to initialize the {@link UserProvider}. 470 * @param connectionUri the connection uri for the hbase cluster 471 * @param conf configuration 472 * @return AsyncConnection object wrapped by CompletableFuture 473 * @see #createAsyncConnection(Configuration, User) 474 * @see UserProvider 475 */ 476 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 477 Configuration conf) { 478 User user; 479 try { 480 user = AuthUtil.loginClient(conf); 481 } catch (IOException e) { 482 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 483 future.completeExceptionally(e); 484 return future; 485 } 486 return createAsyncConnection(connectionUri, conf, user); 487 } 488 489 /** 490 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 491 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 492 * interfaces created from returned connection share zookeeper connection, meta cache, and 493 * connections to region servers and masters. 494 * <p> 495 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 496 * connection instance. 497 * <p> 498 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 499 * as it is thread safe. 500 * @param conf configuration 501 * @param user the user the asynchronous connection is for 502 * @return AsyncConnection object wrapped by CompletableFuture 503 */ 504 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 505 final User user) { 506 return createAsyncConnection(null, conf, user); 507 } 508 509 /** 510 * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and 511 * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. 512 * All tables and interfaces created from returned connection share zookeeper connection(if used), 513 * meta cache, and connections to region servers and masters. 514 * <p> 515 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 516 * connection instance. 517 * <p> 518 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 519 * as it is thread safe. 520 * @param connectionUri the connection uri for the hbase cluster 521 * @param conf configuration 522 * @param user the user the asynchronous connection is for 523 * @return AsyncConnection object wrapped by CompletableFuture 524 */ 525 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 526 Configuration conf, final User user) { 527 return createAsyncConnection(connectionUri, conf, user, null); 528 } 529 530 /** 531 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 532 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 533 * interfaces created from returned connection share zookeeper connection, meta cache, and 534 * connections to region servers and masters. 535 * <p> 536 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 537 * connection instance. 538 * <p> 539 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 540 * as it is thread safe. 541 * @param conf configuration 542 * @param user the user the asynchronous connection is for 543 * @param connectionAttributes attributes to be sent along to server during connection establish 544 * @return AsyncConnection object wrapped by CompletableFuture 545 */ 546 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 547 final User user, Map<String, byte[]> connectionAttributes) { 548 return createAsyncConnection(null, conf, user, connectionAttributes); 549 } 550 551 /** 552 * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and 553 * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. 554 * All tables and interfaces created from returned connection share zookeeper connection(if used), 555 * meta cache, and connections to region servers and masters. 556 * <p> 557 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 558 * connection instance. 559 * <p> 560 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 561 * as it is thread safe. 562 * @param connectionUri the connection uri for the hbase cluster 563 * @param conf configuration 564 * @param user the user the asynchronous connection is for 565 * @param connectionAttributes attributes to be sent along to server during connection establish 566 * @return AsyncConnection object wrapped by CompletableFuture 567 */ 568 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 569 Configuration conf, final User user, Map<String, byte[]> connectionAttributes) { 570 return TraceUtil.tracedFuture(() -> { 571 ConnectionRegistry registry; 572 try { 573 registry = connectionUri != null 574 ? ConnectionRegistryFactory.create(connectionUri, conf, user) 575 : ConnectionRegistryFactory.create(conf, user); 576 } catch (Exception e) { 577 return FutureUtils.failedFuture(e); 578 } 579 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 580 addListener(registry.getClusterId(), (clusterId, error) -> { 581 if (error != null) { 582 registry.close(); 583 future.completeExceptionally(error); 584 return; 585 } 586 if (clusterId == null) { 587 registry.close(); 588 future.completeExceptionally(new IOException("clusterid came back null")); 589 return; 590 } 591 Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 592 AsyncConnectionImpl.class, AsyncConnection.class); 593 try { 594 future.complete( 595 user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils 596 .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes))); 597 } catch (Exception e) { 598 registry.close(); 599 future.completeExceptionally(e); 600 } 601 }); 602 return future; 603 }, "ConnectionFactory.createAsyncConnection"); 604 } 605}