001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.lang.reflect.Constructor; 024import java.net.URI; 025import java.security.PrivilegedExceptionAction; 026import java.util.Collections; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutorService; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.AuthUtil; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.security.User; 034import org.apache.hadoop.hbase.security.UserProvider; 035import org.apache.hadoop.hbase.trace.TraceUtil; 036import org.apache.hadoop.hbase.util.FutureUtils; 037import org.apache.hadoop.hbase.util.ReflectionUtils; 038import org.apache.hadoop.hbase.util.Strings; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 044 045/** 046 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of 047 * the {@link Connection}s to the cluster is the responsibility of the caller. From a 048 * {@link Connection}, {@link Table} implementations are retrieved with 049 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: 050 * 051 * <pre> 052 * Connection connection = ConnectionFactory.createConnection(config); 053 * Table table = connection.getTable(TableName.valueOf("table1")); 054 * try { 055 * // Use the table as needed, for a single operation and a single thread 056 * } finally { 057 * table.close(); 058 * connection.close(); 059 * } 060 * </pre> 061 * 062 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos 063 * credentials if caller has following two configurations set: 064 * <ul> 065 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem 066 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use 067 * </ul> 068 * By this way, caller can directly connect to kerberized cluster without caring login and 069 * credentials renewal logic in application. 070 * 071 * <pre> 072 * </pre> 073 * 074 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator} 075 * implementations. 076 * @see Connection 077 * @since 0.99.0 078 */ 079@InterfaceAudience.Public 080public class ConnectionFactory { 081 082 private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactory.class); 083 084 public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = 085 "hbase.client.async.connection.impl"; 086 087 /** No public c.tors */ 088 protected ConnectionFactory() { 089 } 090 091 /** 092 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 093 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 094 * connection share zookeeper connection, meta cache, and connections to region servers and 095 * masters. <br> 096 * The caller is responsible for calling {@link Connection#close()} on the returned connection 097 * instance. Typical usage: 098 * 099 * <pre> 100 * try (Connection connection = ConnectionFactory.createConnection(conf); 101 * Table table = connection.getTable(TableName.valueOf("table1"))) { 102 * table.get(...); 103 * ... 104 * } 105 * </pre> 106 * 107 * @return Connection object for <code>conf</code> 108 */ 109 public static Connection createConnection() throws IOException { 110 return createConnection(HBaseConfiguration.create()); 111 } 112 113 /** 114 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 115 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 116 * connection share zookeeper connection, meta cache, and connections to region servers and 117 * masters. <br> 118 * The caller is responsible for calling {@link Connection#close()} on the returned connection 119 * instance. Typical usage: 120 * 121 * <pre> 122 * try (Connection connection = ConnectionFactory.createConnection(conf); 123 * Table table = connection.getTable(TableName.valueOf("table1"))) { 124 * table.get(...); 125 * ... 126 * } 127 * </pre> 128 * 129 * @param connectionUri the connection uri for the hbase cluster 130 * @return Connection object for <code>conf</code> 131 */ 132 public static Connection createConnection(URI connectionUri) throws IOException { 133 return createConnection(connectionUri, HBaseConfiguration.create()); 134 } 135 136 /** 137 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 138 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 139 * created from returned connection share zookeeper connection(if used), meta cache, and 140 * connections to region servers and masters. <br> 141 * The caller is responsible for calling {@link Connection#close()} on the returned connection 142 * instance. Typical usage: 143 * 144 * <pre> 145 * try (Connection connection = ConnectionFactory.createConnection(conf); 146 * Table table = connection.getTable(TableName.valueOf("table1"))) { 147 * table.get(...); 148 * ... 149 * } 150 * </pre> 151 * 152 * @param conf configuration 153 * @return Connection object for <code>conf</code> 154 */ 155 public static Connection createConnection(Configuration conf) throws IOException { 156 return createConnection(conf, null, AuthUtil.loginClient(conf)); 157 } 158 159 /** 160 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 161 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 162 * created from returned connection share zookeeper connection(if used), meta cache, and 163 * connections to region servers and masters. <br> 164 * The caller is responsible for calling {@link Connection#close()} on the returned connection 165 * instance. Typical usage: 166 * 167 * <pre> 168 * try (Connection connection = ConnectionFactory.createConnection(conf); 169 * Table table = connection.getTable(TableName.valueOf("table1"))) { 170 * table.get(...); 171 * ... 172 * } 173 * </pre> 174 * 175 * @param connectionUri the connection uri for the hbase cluster 176 * @param conf configuration 177 * @return Connection object for <code>conf</code> 178 */ 179 public static Connection createConnection(URI connectionUri, Configuration conf) 180 throws IOException { 181 return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf)); 182 } 183 184 /** 185 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 186 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 187 * created from returned connection share zookeeper connection(if used), meta cache, and 188 * connections to region servers and masters. <br> 189 * The caller is responsible for calling {@link Connection#close()} on the returned connection 190 * instance. Typical usage: 191 * 192 * <pre> 193 * try (Connection connection = ConnectionFactory.createConnection(conf); 194 * Table table = connection.getTable(TableName.valueOf("table1"))) { 195 * table.get(...); 196 * ... 197 * } 198 * </pre> 199 * 200 * @param conf configuration 201 * @param pool the thread pool to use for batch operations 202 * @return Connection object for <code>conf</code> 203 */ 204 public static Connection createConnection(Configuration conf, ExecutorService pool) 205 throws IOException { 206 return createConnection(conf, pool, AuthUtil.loginClient(conf)); 207 } 208 209 /** 210 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 211 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 212 * created from returned connection share zookeeper connection(if used), meta cache, and 213 * connections to region servers and masters. <br> 214 * The caller is responsible for calling {@link Connection#close()} on the returned connection 215 * instance. Typical usage: 216 * 217 * <pre> 218 * try (Connection connection = ConnectionFactory.createConnection(conf); 219 * Table table = connection.getTable(TableName.valueOf("table1"))) { 220 * table.get(...); 221 * ... 222 * } 223 * </pre> 224 * 225 * @param connectionUri the connection uri for the hbase cluster 226 * @param conf configuration 227 * @param pool the thread pool to use for batch operations 228 * @return Connection object for <code>conf</code> 229 */ 230 public static Connection createConnection(URI connectionUri, Configuration conf, 231 ExecutorService pool) throws IOException { 232 return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf)); 233 } 234 235 /** 236 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 237 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 238 * created from returned connection share zookeeper connection(if used), meta cache, and 239 * connections to region servers and masters. <br> 240 * The caller is responsible for calling {@link Connection#close()} on the returned connection 241 * instance. Typical usage: 242 * 243 * <pre> 244 * try (Connection connection = ConnectionFactory.createConnection(conf); 245 * Table table = connection.getTable(TableName.valueOf("table1"))) { 246 * table.get(...); 247 * ... 248 * } 249 * </pre> 250 * 251 * @param conf configuration 252 * @param user the user the connection is for 253 * @return Connection object for <code>conf</code> 254 */ 255 public static Connection createConnection(Configuration conf, User user) throws IOException { 256 return createConnection(conf, null, user); 257 } 258 259 /** 260 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 261 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 262 * created from returned connection share zookeeper connection(if used), meta cache, and 263 * connections to region servers and masters. <br> 264 * The caller is responsible for calling {@link Connection#close()} on the returned connection 265 * instance. Typical usage: 266 * 267 * <pre> 268 * try (Connection connection = ConnectionFactory.createConnection(conf); 269 * Table table = connection.getTable(TableName.valueOf("table1"))) { 270 * table.get(...); 271 * ... 272 * } 273 * </pre> 274 * 275 * @param connectionUri the connection uri for the hbase cluster 276 * @param conf configuration 277 * @param user the user the connection is for 278 * @return Connection object for <code>conf</code> 279 */ 280 public static Connection createConnection(URI connectionUri, Configuration conf, User user) 281 throws IOException { 282 return createConnection(connectionUri, conf, null, user); 283 } 284 285 /** 286 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 287 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 288 * created from returned connection share zookeeper connection(if used), meta cache, and 289 * connections to region servers and masters. <br> 290 * The caller is responsible for calling {@link Connection#close()} on the returned connection 291 * instance. Typical usage: 292 * 293 * <pre> 294 * try (Connection connection = ConnectionFactory.createConnection(conf); 295 * Table table = connection.getTable(TableName.valueOf("table1"))) { 296 * table.get(...); 297 * ... 298 * } 299 * </pre> 300 * 301 * @param conf configuration 302 * @param user the user the connection is for 303 * @param pool the thread pool to use for batch operations 304 * @return Connection object for <code>conf</code> 305 */ 306 public static Connection createConnection(Configuration conf, ExecutorService pool, 307 final User user) throws IOException { 308 return createConnection(conf, pool, user, Collections.emptyMap()); 309 } 310 311 /** 312 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 313 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 314 * created from returned connection share zookeeper connection(if used), meta cache, and 315 * connections to region servers and masters. <br> 316 * The caller is responsible for calling {@link Connection#close()} on the returned connection 317 * instance. Typical usage: 318 * 319 * <pre> 320 * try (Connection connection = ConnectionFactory.createConnection(conf); 321 * Table table = connection.getTable(TableName.valueOf("table1"))) { 322 * table.get(...); 323 * ... 324 * } 325 * </pre> 326 * 327 * @param connectionUri the connection uri for the hbase cluster 328 * @param conf configuration 329 * @param user the user the connection is for 330 * @param pool the thread pool to use for batch operations 331 * @return Connection object for <code>conf</code> 332 */ 333 public static Connection createConnection(URI connectionUri, Configuration conf, 334 ExecutorService pool, User user) throws IOException { 335 return createConnection(connectionUri, conf, pool, user, Collections.emptyMap()); 336 } 337 338 /** 339 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 340 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 341 * created from returned connection share zookeeper connection(if used), meta cache, and 342 * connections to region servers and masters. <br> 343 * The caller is responsible for calling {@link Connection#close()} on the returned connection 344 * instance. Typical usage: 345 * 346 * <pre> 347 * try (Connection connection = ConnectionFactory.createConnection(conf); 348 * Table table = connection.getTable(TableName.valueOf("table1"))) { 349 * table.get(...); 350 * ... 351 * } 352 * </pre> 353 * 354 * @param conf configuration 355 * @param user the user the connection is for 356 * @param pool the thread pool to use for batch operations 357 * @param connectionAttributes attributes to be sent along to server during connection establish 358 * @return Connection object for <code>conf</code> 359 */ 360 public static Connection createConnection(Configuration conf, ExecutorService pool, 361 final User user, Map<String, byte[]> connectionAttributes) throws IOException { 362 return createConnection(null, conf, pool, user, connectionAttributes); 363 } 364 365 /** 366 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 367 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 368 * created from returned connection share zookeeper connection(if used), meta cache, and 369 * connections to region servers and masters. <br> 370 * The caller is responsible for calling {@link Connection#close()} on the returned connection 371 * instance. Typical usage: 372 * 373 * <pre> 374 * Connection connection = ConnectionFactory.createConnection(conf); 375 * Table table = connection.getTable(TableName.valueOf("table1")); 376 * try (Connection connection = ConnectionFactory.createConnection(conf); 377 * Table table = connection.getTable(TableName.valueOf("table1"))) { 378 * table.get(...); 379 * ... 380 * } 381 * </pre> 382 * 383 * @param connectionUri the connection uri for the hbase cluster 384 * @param conf configuration 385 * @param user the user the connection is for 386 * @param pool the thread pool to use for batch operations 387 * @param connectionAttributes attributes to be sent along to server during connection establish 388 * @return Connection object for <code>conf</code> 389 */ 390 public static Connection createConnection(URI connectionUri, Configuration conf, 391 ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes) 392 throws IOException { 393 Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 394 ConnectionOverAsyncConnection.class, Connection.class); 395 if (clazz != ConnectionOverAsyncConnection.class) { 396 return TraceUtil.trace(() -> { 397 try { 398 // Default HCM#HCI is not accessible; make it so before invoking. 399 Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, 400 ExecutorService.class, User.class, ConnectionRegistry.class, Map.class); 401 constructor.setAccessible(true); 402 ConnectionRegistry registry = connectionUri != null 403 ? ConnectionRegistryFactory.create(connectionUri, conf, user) 404 : ConnectionRegistryFactory.create(conf, user); 405 return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor 406 .newInstance(conf, pool, user, registry, connectionAttributes)); 407 } catch (NoSuchMethodException e) { 408 LOG.debug("Constructor with connection registry not found for class {}," 409 + " fallback to use old constructor", clazz.getName(), e); 410 } catch (Exception e) { 411 Throwables.throwIfInstanceOf(e, IOException.class); 412 Throwables.throwIfUnchecked(e); 413 throw new IOException(e); 414 } 415 416 try { 417 // Default HCM#HCI is not accessible; make it so before invoking. 418 Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, 419 ExecutorService.class, User.class, Map.class); 420 constructor.setAccessible(true); 421 return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor 422 .newInstance(conf, pool, user, connectionAttributes)); 423 } catch (Exception e) { 424 Throwables.throwIfInstanceOf(e, IOException.class); 425 Throwables.throwIfUnchecked(e); 426 throw new IOException(e); 427 } 428 }, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection")); 429 } else { 430 return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes)) 431 .toConnection(); 432 } 433 } 434 435 /** 436 * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration. 437 * @see #createAsyncConnection(Configuration) 438 * @return AsyncConnection object wrapped by CompletableFuture 439 */ 440 public static CompletableFuture<AsyncConnection> createAsyncConnection() { 441 return createAsyncConnection(HBaseConfiguration.create()); 442 } 443 444 /** 445 * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration. 446 * @param connectionUri the connection uri for the hbase cluster 447 * @see #createAsyncConnection(URI, Configuration) 448 * @return AsyncConnection object wrapped by CompletableFuture 449 */ 450 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) { 451 return createAsyncConnection(connectionUri, HBaseConfiguration.create()); 452 } 453 454 /** 455 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a 456 * User object created by {@link UserProvider}. The given {@code conf} will also be used to 457 * initialize the {@link UserProvider}. 458 * @param conf configuration 459 * @return AsyncConnection object wrapped by CompletableFuture 460 * @see #createAsyncConnection(Configuration, User) 461 * @see UserProvider 462 */ 463 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { 464 return createAsyncConnection(null, conf); 465 } 466 467 /** 468 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri}, 469 * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will 470 * also be used to initialize the {@link UserProvider}. 471 * @param connectionUri the connection uri for the hbase cluster 472 * @param conf configuration 473 * @return AsyncConnection object wrapped by CompletableFuture 474 * @see #createAsyncConnection(Configuration, User) 475 * @see UserProvider 476 */ 477 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 478 Configuration conf) { 479 User user; 480 try { 481 user = AuthUtil.loginClient(conf); 482 } catch (IOException e) { 483 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 484 future.completeExceptionally(e); 485 return future; 486 } 487 return createAsyncConnection(connectionUri, conf, user); 488 } 489 490 /** 491 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 492 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 493 * interfaces created from returned connection share zookeeper connection, meta cache, and 494 * connections to region servers and masters. 495 * <p> 496 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 497 * connection instance. 498 * <p> 499 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 500 * as it is thread safe. 501 * @param conf configuration 502 * @param user the user the asynchronous connection is for 503 * @return AsyncConnection object wrapped by CompletableFuture 504 */ 505 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 506 final User user) { 507 return createAsyncConnection(null, conf, user); 508 } 509 510 /** 511 * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and 512 * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. 513 * All tables and interfaces created from returned connection share zookeeper connection(if used), 514 * meta cache, and connections to region servers and masters. 515 * <p> 516 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 517 * connection instance. 518 * <p> 519 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 520 * as it is thread safe. 521 * @param connectionUri the connection uri for the hbase cluster 522 * @param conf configuration 523 * @param user the user the asynchronous connection is for 524 * @return AsyncConnection object wrapped by CompletableFuture 525 */ 526 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 527 Configuration conf, final User user) { 528 return createAsyncConnection(connectionUri, conf, user, null); 529 } 530 531 /** 532 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 533 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 534 * interfaces created from returned connection share zookeeper connection, meta cache, and 535 * connections to region servers and masters. 536 * <p> 537 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 538 * connection instance. 539 * <p> 540 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 541 * as it is thread safe. 542 * @param conf configuration 543 * @param user the user the asynchronous connection is for 544 * @param connectionAttributes attributes to be sent along to server during connection establish 545 * @return AsyncConnection object wrapped by CompletableFuture 546 */ 547 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 548 final User user, Map<String, byte[]> connectionAttributes) { 549 return createAsyncConnection(null, conf, user, connectionAttributes); 550 } 551 552 /** 553 * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and 554 * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. 555 * All tables and interfaces created from returned connection share zookeeper connection(if used), 556 * meta cache, and connections to region servers and masters. 557 * <p> 558 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 559 * connection instance. 560 * <p> 561 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 562 * as it is thread safe. 563 * @param connectionUri the connection uri for the hbase cluster 564 * @param conf configuration 565 * @param user the user the asynchronous connection is for 566 * @param connectionAttributes attributes to be sent along to server during connection establish 567 * @return AsyncConnection object wrapped by CompletableFuture 568 */ 569 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 570 Configuration conf, final User user, Map<String, byte[]> connectionAttributes) { 571 return TraceUtil.tracedFuture(() -> { 572 ConnectionRegistry registry; 573 Configuration appliedConf; 574 try { 575 if (connectionUri != null) { 576 appliedConf = new Configuration(conf); 577 Strings.applyURIQueriesToConf(connectionUri, appliedConf); 578 registry = ConnectionRegistryFactory.create(connectionUri, appliedConf, user); 579 } else { 580 appliedConf = conf; 581 registry = ConnectionRegistryFactory.create(appliedConf, user); 582 } 583 } catch (Exception e) { 584 return FutureUtils.failedFuture(e); 585 } 586 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 587 addListener(registry.getClusterId(), (clusterId, error) -> { 588 if (error != null) { 589 registry.close(); 590 future.completeExceptionally(error); 591 return; 592 } 593 if (clusterId == null) { 594 registry.close(); 595 future.completeExceptionally(new IOException("clusterid came back null")); 596 return; 597 } 598 Class<? extends AsyncConnection> clazz = appliedConf.getClass( 599 HBASE_CLIENT_ASYNC_CONNECTION_IMPL, AsyncConnectionImpl.class, AsyncConnection.class); 600 try { 601 future.complete(user.runAs((PrivilegedExceptionAction< 602 ? extends AsyncConnection>) () -> ReflectionUtils.newInstance(clazz, appliedConf, 603 registry, clusterId, null, user, connectionAttributes))); 604 } catch (Exception e) { 605 registry.close(); 606 future.completeExceptionally(e); 607 } 608 }); 609 return future; 610 }, "ConnectionFactory.createAsyncConnection"); 611 } 612}