001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.IOException;
024import java.lang.reflect.Constructor;
025import java.security.PrivilegedExceptionAction;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutorService;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.AuthUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.security.User;
032import org.apache.hadoop.hbase.security.UserProvider;
033import org.apache.hadoop.hbase.util.ReflectionUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
038 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
039 * {@link Connection}, {@link Table} implementations are retrieved with
040 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
041 *
042 * <pre>
043 * Connection connection = ConnectionFactory.createConnection(config);
044 * Table table = connection.getTable(TableName.valueOf("table1"));
045 * try {
046 *   // Use the table as needed, for a single operation and a single thread
047 * } finally {
048 *   table.close();
049 *   connection.close();
050 * }
051 * </pre>
052 *
053 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
054 * credentials if caller has following two configurations set:
055 * <ul>
056 *   <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
057 *   <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
058 * </ul>
059 * By this way, caller can directly connect to kerberized cluster without caring login and
060 * credentials renewal logic in application.
061 * <pre>
062 * </pre>
063 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
064 * implementations.
065 * @see Connection
066 * @since 0.99.0
067 */
068@InterfaceAudience.Public
069public class ConnectionFactory {
070
071  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl";
072
073  /** No public c.tors */
074  protected ConnectionFactory() {
075  }
076
077  /**
078   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
079   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
080   * connection share zookeeper connection, meta cache, and connections to region servers and
081   * masters. <br>
082   * The caller is responsible for calling {@link Connection#close()} on the returned connection
083   * instance. Typical usage:
084   *
085   * <pre>
086   * Connection connection = ConnectionFactory.createConnection();
087   * Table table = connection.getTable(TableName.valueOf("mytable"));
088   * try {
089   *   table.get(...);
090   *   ...
091   * } finally {
092   *   table.close();
093   *   connection.close();
094   * }
095   * </pre>
096   *
097   * @return Connection object for <code>conf</code>
098   */
099  public static Connection createConnection() throws IOException {
100    Configuration conf = HBaseConfiguration.create();
101    return createConnection(conf, null, AuthUtil.loginClient(conf));
102  }
103
104  /**
105   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
106   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
107   * created from returned connection share zookeeper connection, meta cache, and connections to
108   * region servers and masters. <br>
109   * The caller is responsible for calling {@link Connection#close()} on the returned connection
110   * instance. Typical usage:
111   *
112   * <pre>
113   * Connection connection = ConnectionFactory.createConnection(conf);
114   * Table table = connection.getTable(TableName.valueOf("mytable"));
115   * try {
116   *   table.get(...);
117   *   ...
118   * } finally {
119   *   table.close();
120   *   connection.close();
121   * }
122   * </pre>
123   *
124   * @param conf configuration
125   * @return Connection object for <code>conf</code>
126   */
127  public static Connection createConnection(Configuration conf) throws IOException {
128    return createConnection(conf, null, AuthUtil.loginClient(conf));
129  }
130
131  /**
132   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
133   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
134   * created from returned connection share zookeeper connection, meta cache, and connections to
135   * region servers and masters. <br>
136   * The caller is responsible for calling {@link Connection#close()} on the returned connection
137   * instance. Typical usage:
138   *
139   * <pre>
140   * Connection connection = ConnectionFactory.createConnection(conf);
141   * Table table = connection.getTable(TableName.valueOf("mytable"));
142   * try {
143   *   table.get(...);
144   *   ...
145   * } finally {
146   *   table.close();
147   *   connection.close();
148   * }
149   * </pre>
150   *
151   * @param conf configuration
152   * @param pool the thread pool to use for batch operations
153   * @return Connection object for <code>conf</code>
154   */
155  public static Connection createConnection(Configuration conf, ExecutorService pool)
156      throws IOException {
157    return createConnection(conf, pool, AuthUtil.loginClient(conf));
158  }
159
160  /**
161   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
162   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
163   * created from returned connection share zookeeper connection, meta cache, and connections to
164   * region servers and masters. <br>
165   * The caller is responsible for calling {@link Connection#close()} on the returned connection
166   * instance. Typical usage:
167   *
168   * <pre>
169   * Connection connection = ConnectionFactory.createConnection(conf);
170   * Table table = connection.getTable(TableName.valueOf("table1"));
171   * try {
172   *   table.get(...);
173   *   ...
174   * } finally {
175   *   table.close();
176   *   connection.close();
177   * }
178   * </pre>
179   *
180   * @param conf configuration
181   * @param user the user the connection is for
182   * @return Connection object for <code>conf</code>
183   */
184  public static Connection createConnection(Configuration conf, User user) throws IOException {
185    return createConnection(conf, null, user);
186  }
187
188  /**
189   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
190   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
191   * created from returned connection share zookeeper connection, meta cache, and connections to
192   * region servers and masters. <br>
193   * The caller is responsible for calling {@link Connection#close()} on the returned connection
194   * instance. Typical usage:
195   *
196   * <pre>
197   * Connection connection = ConnectionFactory.createConnection(conf);
198   * Table table = connection.getTable(TableName.valueOf("table1"));
199   * try {
200   *   table.get(...);
201   *   ...
202   * } finally {
203   *   table.close();
204   *   connection.close();
205   * }
206   * </pre>
207   *
208   * @param conf configuration
209   * @param user the user the connection is for
210   * @param pool the thread pool to use for batch operations
211   * @return Connection object for <code>conf</code>
212   */
213  public static Connection createConnection(Configuration conf, ExecutorService pool,
214    final User user) throws IOException {
215    String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
216      ConnectionImplementation.class.getName());
217    Class<?> clazz;
218    try {
219      clazz = Class.forName(className);
220    } catch (ClassNotFoundException e) {
221      throw new IOException(e);
222    }
223    try {
224      // Default HCM#HCI is not accessible; make it so before invoking.
225      Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
226        ExecutorService.class, User.class);
227      constructor.setAccessible(true);
228      return user.runAs(
229        (PrivilegedExceptionAction<Connection>)() ->
230          (Connection) constructor.newInstance(conf, pool, user));
231    } catch (Exception e) {
232      throw new IOException(e);
233    }
234  }
235
236  /**
237   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
238   * @see #createAsyncConnection(Configuration)
239   * @return AsyncConnection object wrapped by CompletableFuture
240   */
241  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
242    return createAsyncConnection(HBaseConfiguration.create());
243  }
244
245  /**
246   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
247   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
248   * initialize the {@link UserProvider}.
249   * @param conf configuration
250   * @return AsyncConnection object wrapped by CompletableFuture
251   * @see #createAsyncConnection(Configuration, User)
252   * @see UserProvider
253   */
254  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
255    User user;
256    try {
257      user = AuthUtil.loginClient(conf);
258    } catch (IOException e) {
259      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
260      future.completeExceptionally(e);
261      return future;
262    }
263    return createAsyncConnection(conf, user);
264  }
265
266  /**
267   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
268   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
269   * interfaces created from returned connection share zookeeper connection, meta cache, and
270   * connections to region servers and masters.
271   * <p>
272   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
273   * connection instance.
274   * <p>
275   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
276   * as it is thread safe.
277   * @param conf configuration
278   * @param user the user the asynchronous connection is for
279   * @return AsyncConnection object wrapped by CompletableFuture
280   */
281  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
282      final User user) {
283    CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
284    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
285    addListener(registry.getClusterId(), (clusterId, error) -> {
286      if (error != null) {
287        registry.close();
288        future.completeExceptionally(error);
289        return;
290      }
291      if (clusterId == null) {
292        registry.close();
293        future.completeExceptionally(new IOException("clusterid came back null"));
294        return;
295      }
296      Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
297        AsyncConnectionImpl.class, AsyncConnection.class);
298      try {
299        future.complete(
300          user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
301            .newInstance(clazz, conf, registry, clusterId, user)));
302      } catch (Exception e) {
303        registry.close();
304        future.completeExceptionally(e);
305      }
306    });
307    return future;
308  }
309}