001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.net.URI;
025import java.security.PrivilegedExceptionAction;
026import java.util.Collections;
027import java.util.Map;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.ExecutorService;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.AuthUtil;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.security.User;
034import org.apache.hadoop.hbase.security.UserProvider;
035import org.apache.hadoop.hbase.trace.TraceUtil;
036import org.apache.hadoop.hbase.util.FutureUtils;
037import org.apache.hadoop.hbase.util.ReflectionUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039
040/**
041 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
042 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
043 * {@link Connection}, {@link Table} implementations are retrieved with
044 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
045 *
046 * <pre>
047 * Connection connection = ConnectionFactory.createConnection(config);
048 * Table table = connection.getTable(TableName.valueOf("table1"));
049 * try {
050 *   // Use the table as needed, for a single operation and a single thread
051 * } finally {
052 *   table.close();
053 *   connection.close();
054 * }
055 * </pre>
056 *
057 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
058 * credentials if caller has following two configurations set:
059 * <ul>
060 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
061 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
062 * </ul>
063 * By this way, caller can directly connect to kerberized cluster without caring login and
064 * credentials renewal logic in application.
065 *
066 * <pre>
067 * </pre>
068 *
069 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
070 * implementations.
071 * @see Connection
072 * @since 0.99.0
073 */
074@InterfaceAudience.Public
075public class ConnectionFactory {
076
077  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
078    "hbase.client.async.connection.impl";
079
080  /** No public c.tors */
081  protected ConnectionFactory() {
082  }
083
084  /**
085   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
086   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
087   * connection share zookeeper connection, meta cache, and connections to region servers and
088   * masters. <br>
089   * The caller is responsible for calling {@link Connection#close()} on the returned connection
090   * instance. Typical usage:
091   *
092   * <pre>
093   * try (Connection connection = ConnectionFactory.createConnection(conf);
094   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
095   *   table.get(...);
096   *   ...
097   * }
098   * </pre>
099   *
100   * @return Connection object for <code>conf</code>
101   */
102  public static Connection createConnection() throws IOException {
103    return createConnection(HBaseConfiguration.create());
104  }
105
106  /**
107   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
108   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
109   * connection share zookeeper connection, meta cache, and connections to region servers and
110   * masters. <br>
111   * The caller is responsible for calling {@link Connection#close()} on the returned connection
112   * instance. Typical usage:
113   *
114   * <pre>
115   * try (Connection connection = ConnectionFactory.createConnection(conf);
116   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
117   *   table.get(...);
118   *   ...
119   * }
120   * </pre>
121   *
122   * @param connectionUri the connection uri for the hbase cluster
123   * @return Connection object for <code>conf</code>
124   */
125  public static Connection createConnection(URI connectionUri) throws IOException {
126    return createConnection(connectionUri, HBaseConfiguration.create());
127  }
128
129  /**
130   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
131   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
132   * created from returned connection share zookeeper connection(if used), meta cache, and
133   * connections to region servers and masters. <br>
134   * The caller is responsible for calling {@link Connection#close()} on the returned connection
135   * instance. Typical usage:
136   *
137   * <pre>
138   * try (Connection connection = ConnectionFactory.createConnection(conf);
139   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
140   *   table.get(...);
141   *   ...
142   * }
143   * </pre>
144   *
145   * @param conf configuration
146   * @return Connection object for <code>conf</code>
147   */
148  public static Connection createConnection(Configuration conf) throws IOException {
149    return createConnection(conf, null, AuthUtil.loginClient(conf));
150  }
151
152  /**
153   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
154   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
155   * created from returned connection share zookeeper connection(if used), meta cache, and
156   * connections to region servers and masters. <br>
157   * The caller is responsible for calling {@link Connection#close()} on the returned connection
158   * instance. Typical usage:
159   *
160   * <pre>
161   * try (Connection connection = ConnectionFactory.createConnection(conf);
162   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
163   *   table.get(...);
164   *   ...
165   * }
166   * </pre>
167   *
168   * @param connectionUri the connection uri for the hbase cluster
169   * @param conf          configuration
170   * @return Connection object for <code>conf</code>
171   */
172  public static Connection createConnection(URI connectionUri, Configuration conf)
173    throws IOException {
174    return createConnection(connectionUri, conf, null, AuthUtil.loginClient(conf));
175  }
176
177  /**
178   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
179   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
180   * created from returned connection share zookeeper connection(if used), meta cache, and
181   * connections to region servers and masters. <br>
182   * The caller is responsible for calling {@link Connection#close()} on the returned connection
183   * instance. Typical usage:
184   *
185   * <pre>
186   * try (Connection connection = ConnectionFactory.createConnection(conf);
187   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
188   *   table.get(...);
189   *   ...
190   * }
191   * </pre>
192   *
193   * @param conf configuration
194   * @param pool the thread pool to use for batch operations
195   * @return Connection object for <code>conf</code>
196   */
197  public static Connection createConnection(Configuration conf, ExecutorService pool)
198    throws IOException {
199    return createConnection(conf, pool, AuthUtil.loginClient(conf));
200  }
201
202  /**
203   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
204   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
205   * created from returned connection share zookeeper connection(if used), meta cache, and
206   * connections to region servers and masters. <br>
207   * The caller is responsible for calling {@link Connection#close()} on the returned connection
208   * instance. Typical usage:
209   *
210   * <pre>
211   * try (Connection connection = ConnectionFactory.createConnection(conf);
212   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
213   *   table.get(...);
214   *   ...
215   * }
216   * </pre>
217   *
218   * @param connectionUri the connection uri for the hbase cluster
219   * @param conf          configuration
220   * @param pool          the thread pool to use for batch operations
221   * @return Connection object for <code>conf</code>
222   */
223  public static Connection createConnection(URI connectionUri, Configuration conf,
224    ExecutorService pool) throws IOException {
225    return createConnection(connectionUri, conf, pool, AuthUtil.loginClient(conf));
226  }
227
228  /**
229   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
230   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
231   * created from returned connection share zookeeper connection(if used), meta cache, and
232   * connections to region servers and masters. <br>
233   * The caller is responsible for calling {@link Connection#close()} on the returned connection
234   * instance. Typical usage:
235   *
236   * <pre>
237   * try (Connection connection = ConnectionFactory.createConnection(conf);
238   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
239   *   table.get(...);
240   *   ...
241   * }
242   * </pre>
243   *
244   * @param conf configuration
245   * @param user the user the connection is for
246   * @return Connection object for <code>conf</code>
247   */
248  public static Connection createConnection(Configuration conf, User user) throws IOException {
249    return createConnection(conf, null, user);
250  }
251
252  /**
253   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
254   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
255   * created from returned connection share zookeeper connection(if used), meta cache, and
256   * connections to region servers and masters. <br>
257   * The caller is responsible for calling {@link Connection#close()} on the returned connection
258   * instance. Typical usage:
259   *
260   * <pre>
261   * try (Connection connection = ConnectionFactory.createConnection(conf);
262   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
263   *   table.get(...);
264   *   ...
265   * }
266   * </pre>
267   *
268   * @param connectionUri the connection uri for the hbase cluster
269   * @param conf          configuration
270   * @param user          the user the connection is for
271   * @return Connection object for <code>conf</code>
272   */
273  public static Connection createConnection(URI connectionUri, Configuration conf, User user)
274    throws IOException {
275    return createConnection(connectionUri, conf, null, user);
276  }
277
278  /**
279   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
280   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
281   * created from returned connection share zookeeper connection(if used), meta cache, and
282   * connections to region servers and masters. <br>
283   * The caller is responsible for calling {@link Connection#close()} on the returned connection
284   * instance. Typical usage:
285   *
286   * <pre>
287   * try (Connection connection = ConnectionFactory.createConnection(conf);
288   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
289   *   table.get(...);
290   *   ...
291   * }
292   * </pre>
293   *
294   * @param conf configuration
295   * @param user the user the connection is for
296   * @param pool the thread pool to use for batch operations
297   * @return Connection object for <code>conf</code>
298   */
299  public static Connection createConnection(Configuration conf, ExecutorService pool,
300    final User user) throws IOException {
301    return createConnection(conf, pool, user, Collections.emptyMap());
302  }
303
304  /**
305   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
306   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
307   * created from returned connection share zookeeper connection(if used), meta cache, and
308   * connections to region servers and masters. <br>
309   * The caller is responsible for calling {@link Connection#close()} on the returned connection
310   * instance. Typical usage:
311   *
312   * <pre>
313   * try (Connection connection = ConnectionFactory.createConnection(conf);
314   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
315   *   table.get(...);
316   *   ...
317   * }
318   * </pre>
319   *
320   * @param connectionUri the connection uri for the hbase cluster
321   * @param conf          configuration
322   * @param user          the user the connection is for
323   * @param pool          the thread pool to use for batch operations
324   * @return Connection object for <code>conf</code>
325   */
326  public static Connection createConnection(URI connectionUri, Configuration conf,
327    ExecutorService pool, User user) throws IOException {
328    return createConnection(connectionUri, conf, pool, user, Collections.emptyMap());
329  }
330
331  /**
332   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
333   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
334   * created from returned connection share zookeeper connection(if used), meta cache, and
335   * connections to region servers and masters. <br>
336   * The caller is responsible for calling {@link Connection#close()} on the returned connection
337   * instance. Typical usage:
338   *
339   * <pre>
340   * try (Connection connection = ConnectionFactory.createConnection(conf);
341   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
342   *   table.get(...);
343   *   ...
344   * }
345   * </pre>
346   *
347   * @param conf                 configuration
348   * @param user                 the user the connection is for
349   * @param pool                 the thread pool to use for batch operations
350   * @param connectionAttributes attributes to be sent along to server during connection establish
351   * @return Connection object for <code>conf</code>
352   */
353  public static Connection createConnection(Configuration conf, ExecutorService pool,
354    final User user, Map<String, byte[]> connectionAttributes) throws IOException {
355    return createConnection(null, conf, pool, user, connectionAttributes);
356  }
357
358  /**
359   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
360   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
361   * created from returned connection share zookeeper connection(if used), meta cache, and
362   * connections to region servers and masters. <br>
363   * The caller is responsible for calling {@link Connection#close()} on the returned connection
364   * instance. Typical usage:
365   *
366   * <pre>
367   * Connection connection = ConnectionFactory.createConnection(conf);
368   * Table table = connection.getTable(TableName.valueOf("table1"));
369   * try (Connection connection = ConnectionFactory.createConnection(conf);
370   *   Table table = connection.getTable(TableName.valueOf("table1"))) {
371   *   table.get(...);
372   *   ...
373   * }
374   * </pre>
375   *
376   * @param connectionUri        the connection uri for the hbase cluster
377   * @param conf                 configuration
378   * @param user                 the user the connection is for
379   * @param pool                 the thread pool to use for batch operations
380   * @param connectionAttributes attributes to be sent along to server during connection establish
381   * @return Connection object for <code>conf</code>
382   */
383  public static Connection createConnection(URI connectionUri, Configuration conf,
384    ExecutorService pool, final User user, Map<String, byte[]> connectionAttributes)
385    throws IOException {
386    Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
387      ConnectionOverAsyncConnection.class, Connection.class);
388    if (clazz != ConnectionOverAsyncConnection.class) {
389      try {
390        // Default HCM#HCI is not accessible; make it so before invoking.
391        Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
392          ExecutorService.class, User.class, Map.class);
393        constructor.setAccessible(true);
394        return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
395          .newInstance(conf, pool, user, connectionAttributes));
396      } catch (Exception e) {
397        throw new IOException(e);
398      }
399    } else {
400      return FutureUtils.get(createAsyncConnection(connectionUri, conf, user, connectionAttributes))
401        .toConnection();
402    }
403  }
404
405  /**
406   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
407   * @see #createAsyncConnection(Configuration)
408   * @return AsyncConnection object wrapped by CompletableFuture
409   */
410  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
411    return createAsyncConnection(HBaseConfiguration.create());
412  }
413
414  /**
415   * Call {@link #createAsyncConnection(URI, Configuration)} using default HBaseConfiguration.
416   * @param connectionUri the connection uri for the hbase cluster
417   * @see #createAsyncConnection(URI, Configuration)
418   * @return AsyncConnection object wrapped by CompletableFuture
419   */
420  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri) {
421    return createAsyncConnection(connectionUri, HBaseConfiguration.create());
422  }
423
424  /**
425   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
426   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
427   * initialize the {@link UserProvider}.
428   * @param conf configuration
429   * @return AsyncConnection object wrapped by CompletableFuture
430   * @see #createAsyncConnection(Configuration, User)
431   * @see UserProvider
432   */
433  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
434    return createAsyncConnection(null, conf);
435  }
436
437  /**
438   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code connectionUri},
439   * {@code conf} and a User object created by {@link UserProvider}. The given {@code conf} will
440   * also be used to initialize the {@link UserProvider}.
441   * @param connectionUri the connection uri for the hbase cluster
442   * @param conf          configuration
443   * @return AsyncConnection object wrapped by CompletableFuture
444   * @see #createAsyncConnection(Configuration, User)
445   * @see UserProvider
446   */
447  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
448    Configuration conf) {
449    User user;
450    try {
451      user = AuthUtil.loginClient(conf);
452    } catch (IOException e) {
453      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
454      future.completeExceptionally(e);
455      return future;
456    }
457    return createAsyncConnection(connectionUri, conf, user);
458  }
459
460  /**
461   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
462   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
463   * interfaces created from returned connection share zookeeper connection, meta cache, and
464   * connections to region servers and masters.
465   * <p>
466   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
467   * connection instance.
468   * <p>
469   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
470   * as it is thread safe.
471   * @param conf configuration
472   * @param user the user the asynchronous connection is for
473   * @return AsyncConnection object wrapped by CompletableFuture
474   */
475  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
476    final User user) {
477    return createAsyncConnection(null, conf, user);
478  }
479
480  /**
481   * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
482   * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
483   * All tables and interfaces created from returned connection share zookeeper connection(if used),
484   * meta cache, and connections to region servers and masters.
485   * <p>
486   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
487   * connection instance.
488   * <p>
489   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
490   * as it is thread safe.
491   * @param connectionUri the connection uri for the hbase cluster
492   * @param conf          configuration
493   * @param user          the user the asynchronous connection is for
494   * @return AsyncConnection object wrapped by CompletableFuture
495   */
496  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
497    Configuration conf, final User user) {
498    return createAsyncConnection(connectionUri, conf, user, null);
499  }
500
501  /**
502   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
503   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
504   * interfaces created from returned connection share zookeeper connection, meta cache, and
505   * connections to region servers and masters.
506   * <p>
507   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
508   * connection instance.
509   * <p>
510   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
511   * as it is thread safe.
512   * @param conf                 configuration
513   * @param user                 the user the asynchronous connection is for
514   * @param connectionAttributes attributes to be sent along to server during connection establish
515   * @return AsyncConnection object wrapped by CompletableFuture
516   */
517  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
518    final User user, Map<String, byte[]> connectionAttributes) {
519    return createAsyncConnection(null, conf, user, connectionAttributes);
520  }
521
522  /**
523   * Create a new AsyncConnection instance using the passed {@code connectionUri}, {@code conf} and
524   * {@code user}. AsyncConnection encapsulates all housekeeping for a connection to the cluster.
525   * All tables and interfaces created from returned connection share zookeeper connection(if used),
526   * meta cache, and connections to region servers and masters.
527   * <p>
528   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
529   * connection instance.
530   * <p>
531   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
532   * as it is thread safe.
533   * @param connectionUri        the connection uri for the hbase cluster
534   * @param conf                 configuration
535   * @param user                 the user the asynchronous connection is for
536   * @param connectionAttributes attributes to be sent along to server during connection establish
537   * @return AsyncConnection object wrapped by CompletableFuture
538   */
539  public static CompletableFuture<AsyncConnection> createAsyncConnection(URI connectionUri,
540    Configuration conf, final User user, Map<String, byte[]> connectionAttributes) {
541    return TraceUtil.tracedFuture(() -> {
542      ConnectionRegistry registry;
543      try {
544        registry = connectionUri != null
545          ? ConnectionRegistryFactory.create(connectionUri, conf, user)
546          : ConnectionRegistryFactory.create(conf, user);
547      } catch (Exception e) {
548        return FutureUtils.failedFuture(e);
549      }
550      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
551      addListener(registry.getClusterId(), (clusterId, error) -> {
552        if (error != null) {
553          registry.close();
554          future.completeExceptionally(error);
555          return;
556        }
557        if (clusterId == null) {
558          registry.close();
559          future.completeExceptionally(new IOException("clusterid came back null"));
560          return;
561        }
562        Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
563          AsyncConnectionImpl.class, AsyncConnection.class);
564        try {
565          future.complete(
566            user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
567              .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes)));
568        } catch (Exception e) {
569          registry.close();
570          future.completeExceptionally(e);
571        }
572      });
573      return future;
574    }, "ConnectionFactory.createAsyncConnection");
575  }
576}