001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.lang.reflect.Constructor; 024import java.net.URI; 025import java.security.PrivilegedExceptionAction; 026import java.util.Collections; 027import java.util.Map; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.ExecutorService; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.AuthUtil; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.security.User; 034import org.apache.hadoop.hbase.security.UserProvider; 035import org.apache.hadoop.hbase.trace.TraceUtil; 036import org.apache.hadoop.hbase.util.FutureUtils; 037import org.apache.hadoop.hbase.util.ReflectionUtils; 038import org.apache.yetus.audience.InterfaceAudience; 039 040/** 041 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of 042 * the {@link Connection}s to the cluster is the responsibility of the caller. From a 043 * {@link Connection}, {@link Table} implementations are retrieved with 044 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: 045 * 046 * <pre> 047 * Connection connection = ConnectionFactory.createConnection(config); 048 * Table table = connection.getTable(TableName.valueOf("table1")); 049 * try { 050 * // Use the table as needed, for a single operation and a single thread 051 * } finally { 052 * table.close(); 053 * connection.close(); 054 * } 055 * </pre> 056 * 057 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos 058 * credentials if caller has following two configurations set: 059 * <ul> 060 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem 061 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use 062 * </ul> 063 * By this way, caller can directly connect to kerberized cluster without caring login and 064 * credentials renewal logic in application. 065 * 066 * <pre> 067 * </pre> 068 * 069 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator} 070 * implementations. 071 * @see Connection 072 * @since 0.99.0 073 */ 074@InterfaceAudience.Public 075public class ConnectionFactory { 076 077 public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = 078 "hbase.client.async.connection.impl"; 079 080 /** No public c.tors */ 081 protected ConnectionFactory() { 082 } 083 084 /** 085 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 086 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 087 * connection share zookeeper connection, meta cache, and connections to region servers and 088 * masters. <br> 089 * The caller is responsible for calling {@link Connection#close()} on the returned connection 090 * instance. Typical usage: 091 * 092 * <pre> 093 * try (Connection connection = ConnectionFactory.createConnection(conf); 094 * Table table = connection.getTable(TableName.valueOf("table1"))) { 095 * table.get(...); 096 * ... 097 * } 098 * </pre> 099 * 100 * @return Connection object for <code>conf</code> 101 */ 102 public static Connection createConnection() throws IOException { 103 return createConnection(HBaseConfiguration.create()); 104 } 105 106 /** 107 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 108 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 109 * connection share zookeeper connection, meta cache, and connections to region servers and 110 * masters. <br> 111 * The caller is responsible for calling {@link Connection#close()} on the returned connection 112 * instance. Typical usage: 113 * 114 * <pre> 115 * try (Connection connection = ConnectionFactory.createConnection(conf); 116 * Table table = connection.getTable(TableName.valueOf("table1"))) { 117 * table.get(...); 118 * ... 119 * } 120 * </pre> 121 * 122 * @param connectionUri the connection uri for the hbase cluster 123 * @return Connection object for <code>conf</code> 124 */ 125 public static Connection createConnection(URI connectionUri) throws IOException { 126 return createConnection(connectionUri, HBaseConfiguration.create()); 127 } 128 129 /** 130 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 131 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 132 * created from returned connection share zookeeper connection(if used), meta cache, and 133 * connections to region servers and masters. <br> 134 * The caller is responsible for calling {@link Connection#close()} on the returned connection 135 * instance. Typical usage: 136 * 137 * <pre> 138 * try (Connection connection = ConnectionFactory.createConnection(conf); 139 * Table table = connection.getTable(TableName.valueOf("table1"))) { 140 * table.get(...); 141 * ... 142 * } 143 * </pre> 144 * 145 * @param conf configuration 146 * @return Connection object for <code>conf</code> 147 */ 148 public static Connection createConnection(Configuration conf) throws IOException { 149 return createConnection(conf, null, AuthUtil.loginClient(conf)); 150 } 151 152 /** 153 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 154 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 155 * created from returned connection share zookeeper connection(if used), meta cache, and 156 * connections to region servers and masters. <br> 157 * The caller is responsible for calling {@link Connection#close()} on the returned connection 158 * instance. Typical usage: 159 * 160 * <pre> 161 * try (Connection connection = ConnectionFactory.createConnection(conf); 162 * Table table = connection.getTable(TableName.valueOf("table1"))) { 163 * table.get(...); 164 * ... 165 * } 166 * </pre> 167 * 168 * @param connectionUri the connection uri for the hbase cluster 169 * @param conf configuration 170 * @return Connection object for <code>conf</code> 171 */ 172 public static Connection createConnection(URI connectionUri, Configuration conf) 173 throws IOException { 174 return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf)); 175 } 176 177 /** 178 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 179 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 180 * created from returned connection share zookeeper connection(if used), meta cache, and 181 * connections to region servers and masters. <br> 182 * The caller is responsible for calling {@link Connection#close()} on the returned connection 183 * instance. Typical usage: 184 * 185 * <pre> 186 * try (Connection connection = ConnectionFactory.createConnection(conf); 187 * Table table = connection.getTable(TableName.valueOf("table1"))) { 188 * table.get(...); 189 * ... 190 * } 191 * </pre> 192 * 193 * @param conf configuration 194 * @param pool the thread pool to use for batch operations 195 * @return Connection object for <code>conf</code> 196 */ 197 public static Connection createConnection(Configuration conf, ExecutorService pool) 198 throws IOException { 199 return createConnection(conf, pool, AuthUtil.loginClient(conf)); 200 } 201 202 /** 203 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 204 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 205 * created from returned connection share zookeeper connection(if used), meta cache, and 206 * connections to region servers and masters. <br> 207 * The caller is responsible for calling {@link Connection#close()} on the returned connection 208 * instance. Typical usage: 209 * 210 * <pre> 211 * try (Connection connection = ConnectionFactory.createConnection(conf); 212 * Table table = connection.getTable(TableName.valueOf("table1"))) { 213 * table.get(...); 214 * ... 215 * } 216 * </pre> 217 * 218 * @param connectionUri the connection uri for the hbase cluster 219 * @param conf configuration 220 * @param pool the thread pool to use for batch operations 221 * @return Connection object for <code>conf</code> 222 */ 223 public static Connection createConnection(URI connectionUri, Configuration conf, 224 ExecutorService pool) throws IOException { 225 return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf)); 226 } 227 228 /** 229 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 230 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 231 * created from returned connection share zookeeper connection(if used), meta cache, and 232 * connections to region servers and masters. <br> 233 * The caller is responsible for calling {@link Connection#close()} on the returned connection 234 * instance. Typical usage: 235 * 236 * <pre> 237 * try (Connection connection = ConnectionFactory.createConnection(conf); 238 * Table table = connection.getTable(TableName.valueOf("table1"))) { 239 * table.get(...); 240 * ... 241 * } 242 * </pre> 243 * 244 * @param conf configuration 245 * @param user the user the connection is for 246 * @return Connection object for <code>conf</code> 247 */ 248 public static Connection createConnection(Configuration conf, User user) throws IOException { 249 return createConnection(conf, null, user); 250 } 251 252 /** 253 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 254 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 255 * created from returned connection share zookeeper connection(if used), meta cache, and 256 * connections to region servers and masters. <br> 257 * The caller is responsible for calling {@link Connection#close()} on the returned connection 258 * instance. Typical usage: 259 * 260 * <pre> 261 * try (Connection connection = ConnectionFactory.createConnection(conf); 262 * Table table = connection.getTable(TableName.valueOf("table1"))) { 263 * table.get(...); 264 * ... 265 * } 266 * </pre> 267 * 268 * @param connectionUri the connection uri for the hbase cluster 269 * @param conf configuration 270 * @param user the user the connection is for 271 * @return Connection object for <code>conf</code> 272 */ 273 public static Connection createConnection(URI connectionUri, Configuration conf, User user) 274 throws IOException { 275 return createConnection(connectionUri, conf, null, user); 276 } 277 278 /** 279 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 280 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 281 * created from returned connection share zookeeper connection(if used), meta cache, and 282 * connections to region servers and masters. <br> 283 * The caller is responsible for calling {@link Connection#close()} on the returned connection 284 * instance. Typical usage: 285 * 286 * <pre> 287 * try (Connection connection = ConnectionFactory.createConnection(conf); 288 * Table table = connection.getTable(TableName.valueOf("table1"))) { 289 * table.get(...); 290 * ... 291 * } 292 * </pre> 293 * 294 * @param conf configuration 295 * @param user the user the connection is for 296 * @param pool the thread pool to use for batch operations 297 * @return Connection object for <code>conf</code> 298 */ 299 public static Connection createConnection(Configuration conf, ExecutorService pool, 300 final User user) throws IOException { 301 return createConnection(conf, pool, user, Collections.emptyMap()); 302 } 303 304 /** 305 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 306 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 307 * created from returned connection share zookeeper connection(if used), meta cache, and 308 * connections to region servers and masters. <br> 309 * The caller is responsible for calling {@link Connection#close()} on the returned connection 310 * instance. Typical usage: 311 * 312 * <pre> 313 * try (Connection connection = ConnectionFactory.createConnection(conf); 314 * Table table = connection.getTable(TableName.valueOf("table1"))) { 315 * table.get(...); 316 * ... 317 * } 318 * </pre> 319 * 320 * @param connectionUri the connection uri for the hbase cluster 321 * @param conf configuration 322 * @param user the user the connection is for 323 * @param pool the thread pool to use for batch operations 324 * @return Connection object for <code>conf</code> 325 */ 326 public static Connection createConnection(URI connectionUri, Configuration conf, 327 ExecutorService pool, User user) throws IOException { 328 return createConnection(connectionUri, conf, pool, user, Collections.emptyMap()); 329 } 330 331 /** 332 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 333 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 334 * created from returned connection share zookeeper connection(if used), meta cache, and 335 * connections to region servers and masters. <br> 336 * The caller is responsible for calling {@link Connection#close()} on the returned connection 337 * instance. Typical usage: 338 * 339 * <pre> 340 * try (Connection connection = ConnectionFactory.createConnection(conf); 341 * Table table = connection.getTable(TableName.valueOf("table1"))) { 342 * table.get(...); 343 * ... 344 * } 345 * </pre> 346 * 347 * @param conf configuration 348 * @param user the user the connection is for 349 * @param pool the thread pool to use for batch operations 350 * @param connectionAttributes attributes to be sent along to server during connection establish 351 * @return Connection object for <code>conf</code> 352 */ 353 public static Connection createConnection(Configuration conf, ExecutorService pool, 354 final User user, Map<String, byte[]> connectionAttributes) throws IOException { 355 return createConnection(null, conf, pool, user, connectionAttributes); 356 } 357 358 /** 359 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 360 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 361 * created from returned connection share zookeeper connection(if used), meta cache, and 362 * connections to region servers and masters. <br> 363 * The caller is responsible for calling {@link Connection#close()} on the returned connection 364 * instance. Typical usage: 365 * 366 * <pre> 367 * Connection connection = ConnectionFactory.createConnection(conf); 368 * Table table = connection.getTable(TableName.valueOf("table1")); 369 * try (Connection connection = ConnectionFactory.createConnection(conf); 370 * Table table = connection.getTable(TableName.valueOf("table1"))) { 371 * table.get(...); 372 * ... 373 * } 374 * </pre> 375 * 376 * @param connectionUri the connection uri for the hbase cluster 377 * @param conf configuration 378 * @param user the user the connection is for 379 * @param pool the thread pool to use for batch operations 380 * @param connectionAttributes attributes to be sent along to server during connection establish 381 * @return Connection object for <code>conf</code> 382 */ 383 public static Connection createConnection(URI connectionUri, Configuration conf, 384 ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes) 385 throws IOException { 386 Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 387 ConnectionOverAsyncConnection.class, Connection.class); 388 if (clazz != ConnectionOverAsyncConnection.class) { 389 try { 390 // Default HCM#HCI is not accessible; make it so before invoking. 391 Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, 392 ExecutorService.class, User.class, Map.class); 393 constructor.setAccessible(true); 394 return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor 395 .newInstance(conf, pool, user, connectionAttributes)); 396 } catch (Exception e) { 397 throw new IOException(e); 398 } 399 } else { 400 return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes)) 401 .toConnection(); 402 } 403 } 404 405 /** 406 * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration. 407 * @see #createAsyncConnection(Configuration) 408 * @return AsyncConnection object wrapped by CompletableFuture 409 */ 410 public static CompletableFuture<AsyncConnection> createAsyncConnection() { 411 return createAsyncConnection(HBaseConfiguration.create()); 412 } 413 414 /** 415 * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration. 416 * @param connectionUri the connection uri for the hbase cluster 417 * @see #createAsyncConnection(URI, Configuration) 418 * @return AsyncConnection object wrapped by CompletableFuture 419 */ 420 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) { 421 return createAsyncConnection(connectionUri, HBaseConfiguration.create()); 422 } 423 424 /** 425 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a 426 * User object created by {@link UserProvider}. The given {@code conf} will also be used to 427 * initialize the {@link UserProvider}. 428 * @param conf configuration 429 * @return AsyncConnection object wrapped by CompletableFuture 430 * @see #createAsyncConnection(Configuration, User) 431 * @see UserProvider 432 */ 433 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { 434 return createAsyncConnection(null, conf); 435 } 436 437 /** 438 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri}, 439 * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will 440 * also be used to initialize the {@link UserProvider}. 441 * @param connectionUri the connection uri for the hbase cluster 442 * @param conf configuration 443 * @return AsyncConnection object wrapped by CompletableFuture 444 * @see #createAsyncConnection(Configuration, User) 445 * @see UserProvider 446 */ 447 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 448 Configuration conf) { 449 User user; 450 try { 451 user = AuthUtil.loginClient(conf); 452 } catch (IOException e) { 453 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 454 future.completeExceptionally(e); 455 return future; 456 } 457 return createAsyncConnection(connectionUri, conf, user); 458 } 459 460 /** 461 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 462 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 463 * interfaces created from returned connection share zookeeper connection, meta cache, and 464 * connections to region servers and masters. 465 * <p> 466 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 467 * connection instance. 468 * <p> 469 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 470 * as it is thread safe. 471 * @param conf configuration 472 * @param user the user the asynchronous connection is for 473 * @return AsyncConnection object wrapped by CompletableFuture 474 */ 475 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 476 final User user) { 477 return createAsyncConnection(null, conf, user); 478 } 479 480 /** 481 * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and 482 * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. 483 * All tables and interfaces created from returned connection share zookeeper connection(if used), 484 * meta cache, and connections to region servers and masters. 485 * <p> 486 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 487 * connection instance. 488 * <p> 489 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 490 * as it is thread safe. 491 * @param connectionUri the connection uri for the hbase cluster 492 * @param conf configuration 493 * @param user the user the asynchronous connection is for 494 * @return AsyncConnection object wrapped by CompletableFuture 495 */ 496 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 497 Configuration conf, final User user) { 498 return createAsyncConnection(connectionUri, conf, user, null); 499 } 500 501 /** 502 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 503 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 504 * interfaces created from returned connection share zookeeper connection, meta cache, and 505 * connections to region servers and masters. 506 * <p> 507 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 508 * connection instance. 509 * <p> 510 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 511 * as it is thread safe. 512 * @param conf configuration 513 * @param user the user the asynchronous connection is for 514 * @param connectionAttributes attributes to be sent along to server during connection establish 515 * @return AsyncConnection object wrapped by CompletableFuture 516 */ 517 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 518 final User user, Map<String, byte[]> connectionAttributes) { 519 return createAsyncConnection(null, conf, user, connectionAttributes); 520 } 521 522 /** 523 * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and 524 * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster. 525 * All tables and interfaces created from returned connection share zookeeper connection(if used), 526 * meta cache, and connections to region servers and masters. 527 * <p> 528 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 529 * connection instance. 530 * <p> 531 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 532 * as it is thread safe. 533 * @param connectionUri the connection uri for the hbase cluster 534 * @param conf configuration 535 * @param user the user the asynchronous connection is for 536 * @param connectionAttributes attributes to be sent along to server during connection establish 537 * @return AsyncConnection object wrapped by CompletableFuture 538 */ 539 public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri, 540 Configuration conf, final User user, Map<String, byte[]> connectionAttributes) { 541 return TraceUtil.tracedFuture(() -> { 542 ConnectionRegistry registry; 543 try { 544 registry = connectionUri != null 545 ? ConnectionRegistryFactory.create(connectionUri, conf, user) 546 : ConnectionRegistryFactory.create(conf, user); 547 } catch (Exception e) { 548 return FutureUtils.failedFuture(e); 549 } 550 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 551 addListener(registry.getClusterId(), (clusterId, error) -> { 552 if (error != null) { 553 registry.close(); 554 future.completeExceptionally(error); 555 return; 556 } 557 if (clusterId == null) { 558 registry.close(); 559 future.completeExceptionally(new IOException("clusterid came back null")); 560 return; 561 } 562 Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 563 AsyncConnectionImpl.class, AsyncConnection.class); 564 try { 565 future.complete( 566 user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils 567 .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes))); 568 } catch (Exception e) { 569 registry.close(); 570 future.completeExceptionally(e); 571 } 572 }); 573 return future; 574 }, "ConnectionFactory.createAsyncConnection"); 575 } 576}