001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.security.PrivilegedExceptionAction;
025import java.util.concurrent.CompletableFuture;
026import java.util.concurrent.ExecutorService;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.AuthUtil;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.security.UserProvider;
032import org.apache.hadoop.hbase.trace.TraceUtil;
033import org.apache.hadoop.hbase.util.FutureUtils;
034import org.apache.hadoop.hbase.util.ReflectionUtils;
035import org.apache.yetus.audience.InterfaceAudience;
036
037/**
038 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
039 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
040 * {@link Connection}, {@link Table} implementations are retrieved with
041 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
042 *
043 * <pre>
044 * Connection connection = ConnectionFactory.createConnection(config);
045 * Table table = connection.getTable(TableName.valueOf("table1"));
046 * try {
047 *   // Use the table as needed, for a single operation and a single thread
048 * } finally {
049 *   table.close();
050 *   connection.close();
051 * }
052 * </pre>
053 *
054 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
055 * credentials if caller has following two configurations set:
056 * <ul>
057 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
058 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
059 * </ul>
060 * By this way, caller can directly connect to kerberized cluster without caring login and
061 * credentials renewal logic in application.
062 *
063 * <pre>
064 * </pre>
065 *
066 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
067 * implementations.
068 * @see Connection
069 * @since 0.99.0
070 */
071@InterfaceAudience.Public
072public class ConnectionFactory {
073
074  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
075    "hbase.client.async.connection.impl";
076
077  /** No public c.tors */
078  protected ConnectionFactory() {
079  }
080
081  /**
082   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
083   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
084   * connection share zookeeper connection, meta cache, and connections to region servers and
085   * masters. <br>
086   * The caller is responsible for calling {@link Connection#close()} on the returned connection
087   * instance. Typical usage:
088   *
089   * <pre>
090   * Connection connection = ConnectionFactory.createConnection();
091   * Table table = connection.getTable(TableName.valueOf("mytable"));
092   * try {
093   *   table.get(...);
094   *   ...
095   * } finally {
096   *   table.close();
097   *   connection.close();
098   * }
099   * </pre>
100   *
101   * @return Connection object for <code>conf</code>
102   */
103  public static Connection createConnection() throws IOException {
104    Configuration conf = HBaseConfiguration.create();
105    return createConnection(conf, null, AuthUtil.loginClient(conf));
106  }
107
108  /**
109   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
110   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
111   * created from returned connection share zookeeper connection, meta cache, and connections to
112   * region servers and masters. <br>
113   * The caller is responsible for calling {@link Connection#close()} on the returned connection
114   * instance. Typical usage:
115   *
116   * <pre>
117   * Connection connection = ConnectionFactory.createConnection(conf);
118   * Table table = connection.getTable(TableName.valueOf("mytable"));
119   * try {
120   *   table.get(...);
121   *   ...
122   * } finally {
123   *   table.close();
124   *   connection.close();
125   * }
126   * </pre>
127   *
128   * @param conf configuration
129   * @return Connection object for <code>conf</code>
130   */
131  public static Connection createConnection(Configuration conf) throws IOException {
132    return createConnection(conf, null, AuthUtil.loginClient(conf));
133  }
134
135  /**
136   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
137   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
138   * created from returned connection share zookeeper connection, meta cache, and connections to
139   * region servers and masters. <br>
140   * The caller is responsible for calling {@link Connection#close()} on the returned connection
141   * instance. Typical usage:
142   *
143   * <pre>
144   * Connection connection = ConnectionFactory.createConnection(conf);
145   * Table table = connection.getTable(TableName.valueOf("mytable"));
146   * try {
147   *   table.get(...);
148   *   ...
149   * } finally {
150   *   table.close();
151   *   connection.close();
152   * }
153   * </pre>
154   *
155   * @param conf configuration
156   * @param pool the thread pool to use for batch operations
157   * @return Connection object for <code>conf</code>
158   */
159  public static Connection createConnection(Configuration conf, ExecutorService pool)
160    throws IOException {
161    return createConnection(conf, pool, AuthUtil.loginClient(conf));
162  }
163
164  /**
165   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
166   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
167   * created from returned connection share zookeeper connection, meta cache, and connections to
168   * region servers and masters. <br>
169   * The caller is responsible for calling {@link Connection#close()} on the returned connection
170   * instance. Typical usage:
171   *
172   * <pre>
173   * Connection connection = ConnectionFactory.createConnection(conf);
174   * Table table = connection.getTable(TableName.valueOf("table1"));
175   * try {
176   *   table.get(...);
177   *   ...
178   * } finally {
179   *   table.close();
180   *   connection.close();
181   * }
182   * </pre>
183   *
184   * @param conf configuration
185   * @param user the user the connection is for
186   * @return Connection object for <code>conf</code>
187   */
188  public static Connection createConnection(Configuration conf, User user) throws IOException {
189    return createConnection(conf, null, user);
190  }
191
192  /**
193   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
194   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
195   * created from returned connection share zookeeper connection, meta cache, and connections to
196   * region servers and masters. <br>
197   * The caller is responsible for calling {@link Connection#close()} on the returned connection
198   * instance. Typical usage:
199   *
200   * <pre>
201   * Connection connection = ConnectionFactory.createConnection(conf);
202   * Table table = connection.getTable(TableName.valueOf("table1"));
203   * try {
204   *   table.get(...);
205   *   ...
206   * } finally {
207   *   table.close();
208   *   connection.close();
209   * }
210   * </pre>
211   *
212   * @param conf configuration
213   * @param user the user the connection is for
214   * @param pool the thread pool to use for batch operations
215   * @return Connection object for <code>conf</code>
216   */
217  public static Connection createConnection(Configuration conf, ExecutorService pool,
218    final User user) throws IOException {
219    Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
220      ConnectionOverAsyncConnection.class, Connection.class);
221    if (clazz != ConnectionOverAsyncConnection.class) {
222      try {
223        // Default HCM#HCI is not accessible; make it so before invoking.
224        Constructor<?> constructor =
225          clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
226        constructor.setAccessible(true);
227        return user.runAs((PrivilegedExceptionAction<
228          Connection>) () -> (Connection) constructor.newInstance(conf, pool, user));
229      } catch (Exception e) {
230        throw new IOException(e);
231      }
232    } else {
233      return FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
234    }
235  }
236
237  /**
238   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
239   * @see #createAsyncConnection(Configuration)
240   * @return AsyncConnection object wrapped by CompletableFuture
241   */
242  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
243    return createAsyncConnection(HBaseConfiguration.create());
244  }
245
246  /**
247   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
248   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
249   * initialize the {@link UserProvider}.
250   * @param conf configuration
251   * @return AsyncConnection object wrapped by CompletableFuture
252   * @see #createAsyncConnection(Configuration, User)
253   * @see UserProvider
254   */
255  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
256    User user;
257    try {
258      user = AuthUtil.loginClient(conf);
259    } catch (IOException e) {
260      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
261      future.completeExceptionally(e);
262      return future;
263    }
264    return createAsyncConnection(conf, user);
265  }
266
267  /**
268   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
269   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
270   * interfaces created from returned connection share zookeeper connection, meta cache, and
271   * connections to region servers and masters.
272   * <p>
273   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
274   * connection instance.
275   * <p>
276   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
277   * as it is thread safe.
278   * @param conf configuration
279   * @param user the user the asynchronous connection is for
280   * @return AsyncConnection object wrapped by CompletableFuture
281   */
282  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
283    final User user) {
284    return TraceUtil.tracedFuture(() -> {
285      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
286      ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
287      addListener(registry.getClusterId(), (clusterId, error) -> {
288        if (error != null) {
289          registry.close();
290          future.completeExceptionally(error);
291          return;
292        }
293        if (clusterId == null) {
294          registry.close();
295          future.completeExceptionally(new IOException("clusterid came back null"));
296          return;
297        }
298        Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
299          AsyncConnectionImpl.class, AsyncConnection.class);
300        try {
301          future.complete(
302            user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
303              .newInstance(clazz, conf, registry, clusterId, null, user)));
304        } catch (Exception e) {
305          registry.close();
306          future.completeExceptionally(e);
307        }
308      });
309      return future;
310    }, "ConnectionFactory.createAsyncConnection");
311  }
312}