001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.IOException;
024import java.lang.reflect.Constructor;
025import java.security.PrivilegedExceptionAction;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutorService;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.AuthUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.security.User;
032import org.apache.hadoop.hbase.security.UserProvider;
033import org.apache.hadoop.hbase.util.FutureUtils;
034import org.apache.hadoop.hbase.util.ReflectionUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036
037/**
038 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
039 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
040 * {@link Connection}, {@link Table} implementations are retrieved with
041 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
042 *
043 * <pre>
044 * Connection connection = ConnectionFactory.createConnection(config);
045 * Table table = connection.getTable(TableName.valueOf("table1"));
046 * try {
047 *   // Use the table as needed, for a single operation and a single thread
048 * } finally {
049 *   table.close();
050 *   connection.close();
051 * }
052 * </pre>
053 *
054 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
055 * credentials if caller has following two configurations set:
056 * <ul>
057 *   <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
058 *   <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
059 * </ul>
060 * By this way, caller can directly connect to kerberized cluster without caring login and
061 * credentials renewal logic in application.
062 * <pre>
063 * </pre>
064 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
065 * implementations.
066 * @see Connection
067 * @since 0.99.0
068 */
069@InterfaceAudience.Public
070public class ConnectionFactory {
071
072  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl";
073
074  /** No public c.tors */
075  protected ConnectionFactory() {
076  }
077
078  /**
079   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
080   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
081   * connection share zookeeper connection, meta cache, and connections to region servers and
082   * masters. <br>
083   * The caller is responsible for calling {@link Connection#close()} on the returned connection
084   * instance. Typical usage:
085   *
086   * <pre>
087   * Connection connection = ConnectionFactory.createConnection();
088   * Table table = connection.getTable(TableName.valueOf("mytable"));
089   * try {
090   *   table.get(...);
091   *   ...
092   * } finally {
093   *   table.close();
094   *   connection.close();
095   * }
096   * </pre>
097   *
098   * @return Connection object for <code>conf</code>
099   */
100  public static Connection createConnection() throws IOException {
101    Configuration conf = HBaseConfiguration.create();
102    return createConnection(conf, null, AuthUtil.loginClient(conf));
103  }
104
105  /**
106   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
107   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
108   * created from returned connection share zookeeper connection, meta cache, and connections to
109   * region servers and masters. <br>
110   * The caller is responsible for calling {@link Connection#close()} on the returned connection
111   * instance. Typical usage:
112   *
113   * <pre>
114   * Connection connection = ConnectionFactory.createConnection(conf);
115   * Table table = connection.getTable(TableName.valueOf("mytable"));
116   * try {
117   *   table.get(...);
118   *   ...
119   * } finally {
120   *   table.close();
121   *   connection.close();
122   * }
123   * </pre>
124   *
125   * @param conf configuration
126   * @return Connection object for <code>conf</code>
127   */
128  public static Connection createConnection(Configuration conf) throws IOException {
129    return createConnection(conf, null, AuthUtil.loginClient(conf));
130  }
131
132  /**
133   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
134   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
135   * created from returned connection share zookeeper connection, meta cache, and connections to
136   * region servers and masters. <br>
137   * The caller is responsible for calling {@link Connection#close()} on the returned connection
138   * instance. Typical usage:
139   *
140   * <pre>
141   * Connection connection = ConnectionFactory.createConnection(conf);
142   * Table table = connection.getTable(TableName.valueOf("mytable"));
143   * try {
144   *   table.get(...);
145   *   ...
146   * } finally {
147   *   table.close();
148   *   connection.close();
149   * }
150   * </pre>
151   *
152   * @param conf configuration
153   * @param pool the thread pool to use for batch operations
154   * @return Connection object for <code>conf</code>
155   */
156  public static Connection createConnection(Configuration conf, ExecutorService pool)
157      throws IOException {
158    return createConnection(conf, pool, AuthUtil.loginClient(conf));
159  }
160
161  /**
162   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
163   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
164   * created from returned connection share zookeeper connection, meta cache, and connections to
165   * region servers and masters. <br>
166   * The caller is responsible for calling {@link Connection#close()} on the returned connection
167   * instance. Typical usage:
168   *
169   * <pre>
170   * Connection connection = ConnectionFactory.createConnection(conf);
171   * Table table = connection.getTable(TableName.valueOf("table1"));
172   * try {
173   *   table.get(...);
174   *   ...
175   * } finally {
176   *   table.close();
177   *   connection.close();
178   * }
179   * </pre>
180   *
181   * @param conf configuration
182   * @param user the user the connection is for
183   * @return Connection object for <code>conf</code>
184   */
185  public static Connection createConnection(Configuration conf, User user) throws IOException {
186    return createConnection(conf, null, user);
187  }
188
189  /**
190   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
191   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
192   * created from returned connection share zookeeper connection, meta cache, and connections to
193   * region servers and masters. <br>
194   * The caller is responsible for calling {@link Connection#close()} on the returned connection
195   * instance. Typical usage:
196   *
197   * <pre>
198   * Connection connection = ConnectionFactory.createConnection(conf);
199   * Table table = connection.getTable(TableName.valueOf("table1"));
200   * try {
201   *   table.get(...);
202   *   ...
203   * } finally {
204   *   table.close();
205   *   connection.close();
206   * }
207   * </pre>
208   *
209   * @param conf configuration
210   * @param user the user the connection is for
211   * @param pool the thread pool to use for batch operations
212   * @return Connection object for <code>conf</code>
213   */
214  public static Connection createConnection(Configuration conf, ExecutorService pool,
215      final User user) throws IOException {
216    Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
217      ConnectionOverAsyncConnection.class, Connection.class);
218    if (clazz != ConnectionOverAsyncConnection.class) {
219      try {
220        // Default HCM#HCI is not accessible; make it so before invoking.
221        Constructor<?> constructor =
222          clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
223        constructor.setAccessible(true);
224        return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
225          .newInstance(conf, pool, user));
226      } catch (Exception e) {
227        throw new IOException(e);
228      }
229    } else {
230      return FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
231    }
232  }
233
234  /**
235   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
236   * @see #createAsyncConnection(Configuration)
237   * @return AsyncConnection object wrapped by CompletableFuture
238   */
239  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
240    return createAsyncConnection(HBaseConfiguration.create());
241  }
242
243  /**
244   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
245   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
246   * initialize the {@link UserProvider}.
247   * @param conf configuration
248   * @return AsyncConnection object wrapped by CompletableFuture
249   * @see #createAsyncConnection(Configuration, User)
250   * @see UserProvider
251   */
252  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
253    User user;
254    try {
255      user = AuthUtil.loginClient(conf);
256    } catch (IOException e) {
257      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
258      future.completeExceptionally(e);
259      return future;
260    }
261    return createAsyncConnection(conf, user);
262  }
263
264  /**
265   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
266   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
267   * interfaces created from returned connection share zookeeper connection, meta cache, and
268   * connections to region servers and masters.
269   * <p>
270   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
271   * connection instance.
272   * <p>
273   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
274   * as it is thread safe.
275   * @param conf configuration
276   * @param user the user the asynchronous connection is for
277   * @return AsyncConnection object wrapped by CompletableFuture
278   */
279  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
280      final User user) {
281    CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
282    ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
283    addListener(registry.getClusterId(), (clusterId, error) -> {
284      if (error != null) {
285        registry.close();
286        future.completeExceptionally(error);
287        return;
288      }
289      if (clusterId == null) {
290        registry.close();
291        future.completeExceptionally(new IOException("clusterid came back null"));
292        return;
293      }
294      Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
295        AsyncConnectionImpl.class, AsyncConnection.class);
296      try {
297        future.complete(
298          user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
299            .newInstance(clazz, conf, registry, clusterId, null, user)));
300      } catch (Exception e) {
301        registry.close();
302        future.completeExceptionally(e);
303      }
304    });
305    return future;
306  }
307}