001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.security.PrivilegedExceptionAction;
025import java.util.concurrent.CompletableFuture;
026import java.util.concurrent.ExecutorService;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.AuthUtil;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.security.UserProvider;
032import org.apache.hadoop.hbase.trace.TraceUtil;
033import org.apache.hadoop.hbase.util.ReflectionUtils;
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
038 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
039 * {@link Connection}, {@link Table} implementations are retrieved with
040 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
041 *
042 * <pre>
043 * Connection connection = ConnectionFactory.createConnection(config);
044 * Table table = connection.getTable(TableName.valueOf("table1"));
045 * try {
046 *   // Use the table as needed, for a single operation and a single thread
047 * } finally {
048 *   table.close();
049 *   connection.close();
050 * }
051 * </pre>
052 *
053 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
054 * credentials if caller has following two configurations set:
055 * <ul>
056 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
057 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
058 * </ul>
059 * By this way, caller can directly connect to kerberized cluster without caring login and
060 * credentials renewal logic in application.
061 *
062 * <pre>
063 * </pre>
064 *
065 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
066 * implementations.
067 * @see Connection
068 * @since 0.99.0
069 */
070@InterfaceAudience.Public
071public class ConnectionFactory {
072
073  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
074    "hbase.client.async.connection.impl";
075
076  /** No public c.tors */
077  protected ConnectionFactory() {
078  }
079
080  /**
081   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
082   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
083   * connection share zookeeper connection, meta cache, and connections to region servers and
084   * masters. <br>
085   * The caller is responsible for calling {@link Connection#close()} on the returned connection
086   * instance. Typical usage:
087   *
088   * <pre>
089   * Connection connection = ConnectionFactory.createConnection();
090   * Table table = connection.getTable(TableName.valueOf("mytable"));
091   * try {
092   *   table.get(...);
093   *   ...
094   * } finally {
095   *   table.close();
096   *   connection.close();
097   * }
098   * </pre>
099   *
100   * @return Connection object for <code>conf</code>
101   */
102  public static Connection createConnection() throws IOException {
103    Configuration conf = HBaseConfiguration.create();
104    return createConnection(conf, null, AuthUtil.loginClient(conf));
105  }
106
107  /**
108   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
109   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
110   * created from returned connection share zookeeper connection, meta cache, and connections to
111   * region servers and masters. <br>
112   * The caller is responsible for calling {@link Connection#close()} on the returned connection
113   * instance. Typical usage:
114   *
115   * <pre>
116   * Connection connection = ConnectionFactory.createConnection(conf);
117   * Table table = connection.getTable(TableName.valueOf("mytable"));
118   * try {
119   *   table.get(...);
120   *   ...
121   * } finally {
122   *   table.close();
123   *   connection.close();
124   * }
125   * </pre>
126   *
127   * @param conf configuration
128   * @return Connection object for <code>conf</code>
129   */
130  public static Connection createConnection(Configuration conf) throws IOException {
131    return createConnection(conf, null, AuthUtil.loginClient(conf));
132  }
133
134  /**
135   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
136   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
137   * created from returned connection share zookeeper connection, meta cache, and connections to
138   * region servers and masters. <br>
139   * The caller is responsible for calling {@link Connection#close()} on the returned connection
140   * instance. Typical usage:
141   *
142   * <pre>
143   * Connection connection = ConnectionFactory.createConnection(conf);
144   * Table table = connection.getTable(TableName.valueOf("mytable"));
145   * try {
146   *   table.get(...);
147   *   ...
148   * } finally {
149   *   table.close();
150   *   connection.close();
151   * }
152   * </pre>
153   *
154   * @param conf configuration
155   * @param pool the thread pool to use for batch operations
156   * @return Connection object for <code>conf</code>
157   */
158  public static Connection createConnection(Configuration conf, ExecutorService pool)
159    throws IOException {
160    return createConnection(conf, pool, AuthUtil.loginClient(conf));
161  }
162
163  /**
164   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
165   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
166   * created from returned connection share zookeeper connection, meta cache, and connections to
167   * region servers and masters. <br>
168   * The caller is responsible for calling {@link Connection#close()} on the returned connection
169   * instance. Typical usage:
170   *
171   * <pre>
172   * Connection connection = ConnectionFactory.createConnection(conf);
173   * Table table = connection.getTable(TableName.valueOf("table1"));
174   * try {
175   *   table.get(...);
176   *   ...
177   * } finally {
178   *   table.close();
179   *   connection.close();
180   * }
181   * </pre>
182   *
183   * @param conf configuration
184   * @param user the user the connection is for
185   * @return Connection object for <code>conf</code>
186   */
187  public static Connection createConnection(Configuration conf, User user) throws IOException {
188    return createConnection(conf, null, user);
189  }
190
191  /**
192   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
193   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
194   * created from returned connection share zookeeper connection, meta cache, and connections to
195   * region servers and masters. <br>
196   * The caller is responsible for calling {@link Connection#close()} on the returned connection
197   * instance. Typical usage:
198   *
199   * <pre>
200   * Connection connection = ConnectionFactory.createConnection(conf);
201   * Table table = connection.getTable(TableName.valueOf("table1"));
202   * try {
203   *   table.get(...);
204   *   ...
205   * } finally {
206   *   table.close();
207   *   connection.close();
208   * }
209   * </pre>
210   *
211   * @param conf configuration
212   * @param user the user the connection is for
213   * @param pool the thread pool to use for batch operations
214   * @return Connection object for <code>conf</code>
215   */
216  public static Connection createConnection(Configuration conf, ExecutorService pool,
217    final User user) throws IOException {
218    return TraceUtil.trace(() -> {
219      String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
220        ConnectionImplementation.class.getName());
221      Class<?> clazz;
222      try {
223        clazz = Class.forName(className);
224      } catch (ClassNotFoundException e) {
225        throw new IOException(e);
226      }
227      try {
228        // Default HCM#HCI is not accessible; make it so before invoking.
229        Constructor<?> constructor =
230          clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
231        constructor.setAccessible(true);
232        return user.runAs((PrivilegedExceptionAction<
233          Connection>) () -> (Connection) constructor.newInstance(conf, pool, user));
234      } catch (Exception e) {
235        throw new IOException(e);
236      }
237    }, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection"));
238  }
239
240  /**
241   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
242   * @see #createAsyncConnection(Configuration)
243   * @return AsyncConnection object wrapped by CompletableFuture
244   */
245  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
246    return createAsyncConnection(HBaseConfiguration.create());
247  }
248
249  /**
250   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
251   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
252   * initialize the {@link UserProvider}.
253   * @param conf configuration
254   * @return AsyncConnection object wrapped by CompletableFuture
255   * @see #createAsyncConnection(Configuration, User)
256   * @see UserProvider
257   */
258  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
259    User user;
260    try {
261      user = AuthUtil.loginClient(conf);
262    } catch (IOException e) {
263      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
264      future.completeExceptionally(e);
265      return future;
266    }
267    return createAsyncConnection(conf, user);
268  }
269
270  /**
271   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
272   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
273   * interfaces created from returned connection share zookeeper connection, meta cache, and
274   * connections to region servers and masters.
275   * <p>
276   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
277   * connection instance.
278   * <p>
279   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
280   * as it is thread safe.
281   * @param conf configuration
282   * @param user the user the asynchronous connection is for
283   * @return AsyncConnection object wrapped by CompletableFuture
284   */
285  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
286    final User user) {
287    return TraceUtil.tracedFuture(() -> {
288      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
289      ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
290      addListener(registry.getClusterId(), (clusterId, error) -> {
291        if (error != null) {
292          registry.close();
293          future.completeExceptionally(error);
294          return;
295        }
296        if (clusterId == null) {
297          registry.close();
298          future.completeExceptionally(new IOException("clusterid came back null"));
299          return;
300        }
301        Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
302          AsyncConnectionImpl.class, AsyncConnection.class);
303        try {
304          future.complete(
305            user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
306              .newInstance(clazz, conf, registry, clusterId, user)));
307        } catch (Exception e) {
308          registry.close();
309          future.completeExceptionally(e);
310        }
311      });
312      return future;
313    }, "ConnectionFactory.createAsyncConnection");
314  }
315}