001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.IOException;
024import java.lang.reflect.Constructor;
025import java.security.PrivilegedExceptionAction;
026import java.util.concurrent.CompletableFuture;
027import java.util.concurrent.ExecutorService;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.AuthUtil;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.security.User;
032import org.apache.hadoop.hbase.security.UserProvider;
033import org.apache.hadoop.hbase.trace.TraceUtil;
034import org.apache.hadoop.hbase.util.FutureUtils;
035import org.apache.hadoop.hbase.util.ReflectionUtils;
036import org.apache.yetus.audience.InterfaceAudience;
037
038/**
039 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
040 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
041 * {@link Connection}, {@link Table} implementations are retrieved with
042 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
043 *
044 * <pre>
045 * Connection connection = ConnectionFactory.createConnection(config);
046 * Table table = connection.getTable(TableName.valueOf("table1"));
047 * try {
048 *   // Use the table as needed, for a single operation and a single thread
049 * } finally {
050 *   table.close();
051 *   connection.close();
052 * }
053 * </pre>
054 *
055 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
056 * credentials if caller has following two configurations set:
057 * <ul>
058 *   <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
059 *   <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
060 * </ul>
061 * By this way, caller can directly connect to kerberized cluster without caring login and
062 * credentials renewal logic in application.
063 * <pre>
064 * </pre>
065 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
066 * implementations.
067 * @see Connection
068 * @since 0.99.0
069 */
070@InterfaceAudience.Public
071public class ConnectionFactory {
072
073  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl";
074
075  /** No public c.tors */
076  protected ConnectionFactory() {
077  }
078
079  /**
080   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
081   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
082   * connection share zookeeper connection, meta cache, and connections to region servers and
083   * masters. <br>
084   * The caller is responsible for calling {@link Connection#close()} on the returned connection
085   * instance. Typical usage:
086   *
087   * <pre>
088   * Connection connection = ConnectionFactory.createConnection();
089   * Table table = connection.getTable(TableName.valueOf("mytable"));
090   * try {
091   *   table.get(...);
092   *   ...
093   * } finally {
094   *   table.close();
095   *   connection.close();
096   * }
097   * </pre>
098   *
099   * @return Connection object for <code>conf</code>
100   */
101  public static Connection createConnection() throws IOException {
102    Configuration conf = HBaseConfiguration.create();
103    return createConnection(conf, null, AuthUtil.loginClient(conf));
104  }
105
106  /**
107   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
108   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
109   * created from returned connection share zookeeper connection, meta cache, and connections to
110   * region servers and masters. <br>
111   * The caller is responsible for calling {@link Connection#close()} on the returned connection
112   * instance. Typical usage:
113   *
114   * <pre>
115   * Connection connection = ConnectionFactory.createConnection(conf);
116   * Table table = connection.getTable(TableName.valueOf("mytable"));
117   * try {
118   *   table.get(...);
119   *   ...
120   * } finally {
121   *   table.close();
122   *   connection.close();
123   * }
124   * </pre>
125   *
126   * @param conf configuration
127   * @return Connection object for <code>conf</code>
128   */
129  public static Connection createConnection(Configuration conf) throws IOException {
130    return createConnection(conf, null, AuthUtil.loginClient(conf));
131  }
132
133  /**
134   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
135   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
136   * created from returned connection share zookeeper connection, meta cache, and connections to
137   * region servers and masters. <br>
138   * The caller is responsible for calling {@link Connection#close()} on the returned connection
139   * instance. Typical usage:
140   *
141   * <pre>
142   * Connection connection = ConnectionFactory.createConnection(conf);
143   * Table table = connection.getTable(TableName.valueOf("mytable"));
144   * try {
145   *   table.get(...);
146   *   ...
147   * } finally {
148   *   table.close();
149   *   connection.close();
150   * }
151   * </pre>
152   *
153   * @param conf configuration
154   * @param pool the thread pool to use for batch operations
155   * @return Connection object for <code>conf</code>
156   */
157  public static Connection createConnection(Configuration conf, ExecutorService pool)
158      throws IOException {
159    return createConnection(conf, pool, AuthUtil.loginClient(conf));
160  }
161
162  /**
163   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
164   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
165   * created from returned connection share zookeeper connection, meta cache, and connections to
166   * region servers and masters. <br>
167   * The caller is responsible for calling {@link Connection#close()} on the returned connection
168   * instance. Typical usage:
169   *
170   * <pre>
171   * Connection connection = ConnectionFactory.createConnection(conf);
172   * Table table = connection.getTable(TableName.valueOf("table1"));
173   * try {
174   *   table.get(...);
175   *   ...
176   * } finally {
177   *   table.close();
178   *   connection.close();
179   * }
180   * </pre>
181   *
182   * @param conf configuration
183   * @param user the user the connection is for
184   * @return Connection object for <code>conf</code>
185   */
186  public static Connection createConnection(Configuration conf, User user) throws IOException {
187    return createConnection(conf, null, user);
188  }
189
190  /**
191   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
192   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
193   * created from returned connection share zookeeper connection, meta cache, and connections to
194   * region servers and masters. <br>
195   * The caller is responsible for calling {@link Connection#close()} on the returned connection
196   * instance. Typical usage:
197   *
198   * <pre>
199   * Connection connection = ConnectionFactory.createConnection(conf);
200   * Table table = connection.getTable(TableName.valueOf("table1"));
201   * try {
202   *   table.get(...);
203   *   ...
204   * } finally {
205   *   table.close();
206   *   connection.close();
207   * }
208   * </pre>
209   *
210   * @param conf configuration
211   * @param user the user the connection is for
212   * @param pool the thread pool to use for batch operations
213   * @return Connection object for <code>conf</code>
214   */
215  public static Connection createConnection(Configuration conf, ExecutorService pool,
216      final User user) throws IOException {
217    Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
218      ConnectionOverAsyncConnection.class, Connection.class);
219    if (clazz != ConnectionOverAsyncConnection.class) {
220      try {
221        // Default HCM#HCI is not accessible; make it so before invoking.
222        Constructor<?> constructor =
223          clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
224        constructor.setAccessible(true);
225        return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
226          .newInstance(conf, pool, user));
227      } catch (Exception e) {
228        throw new IOException(e);
229      }
230    } else {
231      return FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
232    }
233  }
234
235  /**
236   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
237   * @see #createAsyncConnection(Configuration)
238   * @return AsyncConnection object wrapped by CompletableFuture
239   */
240  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
241    return createAsyncConnection(HBaseConfiguration.create());
242  }
243
244  /**
245   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
246   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
247   * initialize the {@link UserProvider}.
248   * @param conf configuration
249   * @return AsyncConnection object wrapped by CompletableFuture
250   * @see #createAsyncConnection(Configuration, User)
251   * @see UserProvider
252   */
253  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
254    User user;
255    try {
256      user = AuthUtil.loginClient(conf);
257    } catch (IOException e) {
258      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
259      future.completeExceptionally(e);
260      return future;
261    }
262    return createAsyncConnection(conf, user);
263  }
264
265  /**
266   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
267   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
268   * interfaces created from returned connection share zookeeper connection, meta cache, and
269   * connections to region servers and masters.
270   * <p>
271   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
272   * connection instance.
273   * <p>
274   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
275   * as it is thread safe.
276   * @param conf configuration
277   * @param user the user the asynchronous connection is for
278   * @return AsyncConnection object wrapped by CompletableFuture
279   */
280  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
281    final User user) {
282    return TraceUtil.tracedFuture(() -> {
283      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
284      ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
285      addListener(registry.getClusterId(), (clusterId, error) -> {
286        if (error != null) {
287          registry.close();
288          future.completeExceptionally(error);
289          return;
290        }
291        if (clusterId == null) {
292          registry.close();
293          future.completeExceptionally(new IOException("clusterid came back null"));
294          return;
295        }
296        Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
297          AsyncConnectionImpl.class, AsyncConnection.class);
298        try {
299          future.complete(
300            user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
301              .newInstance(clazz, conf, registry, clusterId, null, user)));
302        } catch (Exception e) {
303          registry.close();
304          future.completeExceptionally(e);
305        }
306      });
307      return future;
308    }, "ConnectionFactory.createAsyncConnection");
309  }
310}