001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
021
022import java.io.IOException;
023import java.lang.reflect.Constructor;
024import java.security.PrivilegedExceptionAction;
025import java.util.Collections;
026import java.util.Map;
027import java.util.concurrent.CompletableFuture;
028import java.util.concurrent.ExecutorService;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.AuthUtil;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.security.User;
033import org.apache.hadoop.hbase.security.UserProvider;
034import org.apache.hadoop.hbase.trace.TraceUtil;
035import org.apache.hadoop.hbase.util.FutureUtils;
036import org.apache.hadoop.hbase.util.ReflectionUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
041 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
042 * {@link Connection}, {@link Table} implementations are retrieved with
043 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
044 *
045 * <pre>
046 * Connection connection = ConnectionFactory.createConnection(config);
047 * Table table = connection.getTable(TableName.valueOf("table1"));
048 * try {
049 *   // Use the table as needed, for a single operation and a single thread
050 * } finally {
051 *   table.close();
052 *   connection.close();
053 * }
054 * </pre>
055 *
056 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos
057 * credentials if caller has following two configurations set:
058 * <ul>
059 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem
060 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use
061 * </ul>
062 * By this way, caller can directly connect to kerberized cluster without caring login and
063 * credentials renewal logic in application.
064 *
065 * <pre>
066 * </pre>
067 *
068 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
069 * implementations.
070 * @see Connection
071 * @since 0.99.0
072 */
073@InterfaceAudience.Public
074public class ConnectionFactory {
075
076  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
077    "hbase.client.async.connection.impl";
078
079  /** No public c.tors */
080  protected ConnectionFactory() {
081  }
082
083  /**
084   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
085   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
086   * connection share zookeeper connection, meta cache, and connections to region servers and
087   * masters. <br>
088   * The caller is responsible for calling {@link Connection#close()} on the returned connection
089   * instance. Typical usage:
090   *
091   * <pre>
092   * Connection connection = ConnectionFactory.createConnection();
093   * Table table = connection.getTable(TableName.valueOf("mytable"));
094   * try {
095   *   table.get(...);
096   *   ...
097   * } finally {
098   *   table.close();
099   *   connection.close();
100   * }
101   * </pre>
102   *
103   * @return Connection object for <code>conf</code>
104   */
105  public static Connection createConnection() throws IOException {
106    Configuration conf = HBaseConfiguration.create();
107    return createConnection(conf, null, AuthUtil.loginClient(conf));
108  }
109
110  /**
111   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
112   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
113   * created from returned connection share zookeeper connection, meta cache, and connections to
114   * region servers and masters. <br>
115   * The caller is responsible for calling {@link Connection#close()} on the returned connection
116   * instance. Typical usage:
117   *
118   * <pre>
119   * Connection connection = ConnectionFactory.createConnection(conf);
120   * Table table = connection.getTable(TableName.valueOf("mytable"));
121   * try {
122   *   table.get(...);
123   *   ...
124   * } finally {
125   *   table.close();
126   *   connection.close();
127   * }
128   * </pre>
129   *
130   * @param conf configuration
131   * @return Connection object for <code>conf</code>
132   */
133  public static Connection createConnection(Configuration conf) throws IOException {
134    return createConnection(conf, null, AuthUtil.loginClient(conf));
135  }
136
137  /**
138   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
139   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
140   * created from returned connection share zookeeper connection, meta cache, and connections to
141   * region servers and masters. <br>
142   * The caller is responsible for calling {@link Connection#close()} on the returned connection
143   * instance. Typical usage:
144   *
145   * <pre>
146   * Connection connection = ConnectionFactory.createConnection(conf);
147   * Table table = connection.getTable(TableName.valueOf("mytable"));
148   * try {
149   *   table.get(...);
150   *   ...
151   * } finally {
152   *   table.close();
153   *   connection.close();
154   * }
155   * </pre>
156   *
157   * @param conf configuration
158   * @param pool the thread pool to use for batch operations
159   * @return Connection object for <code>conf</code>
160   */
161  public static Connection createConnection(Configuration conf, ExecutorService pool)
162    throws IOException {
163    return createConnection(conf, pool, AuthUtil.loginClient(conf));
164  }
165
166  /**
167   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
168   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
169   * created from returned connection share zookeeper connection, meta cache, and connections to
170   * region servers and masters. <br>
171   * The caller is responsible for calling {@link Connection#close()} on the returned connection
172   * instance. Typical usage:
173   *
174   * <pre>
175   * Connection connection = ConnectionFactory.createConnection(conf);
176   * Table table = connection.getTable(TableName.valueOf("table1"));
177   * try {
178   *   table.get(...);
179   *   ...
180   * } finally {
181   *   table.close();
182   *   connection.close();
183   * }
184   * </pre>
185   *
186   * @param conf configuration
187   * @param user the user the connection is for
188   * @return Connection object for <code>conf</code>
189   */
190  public static Connection createConnection(Configuration conf, User user) throws IOException {
191    return createConnection(conf, null, user);
192  }
193
194  /**
195   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
196   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
197   * created from returned connection share zookeeper connection, meta cache, and connections to
198   * region servers and masters. <br>
199   * The caller is responsible for calling {@link Connection#close()} on the returned connection
200   * instance. Typical usage:
201   *
202   * <pre>
203   * Connection connection = ConnectionFactory.createConnection(conf);
204   * Table table = connection.getTable(TableName.valueOf("table1"));
205   * try {
206   *   table.get(...);
207   *   ...
208   * } finally {
209   *   table.close();
210   *   connection.close();
211   * }
212   * </pre>
213   *
214   * @param conf configuration
215   * @param user the user the connection is for
216   * @param pool the thread pool to use for batch operations
217   * @return Connection object for <code>conf</code>
218   */
219  public static Connection createConnection(Configuration conf, ExecutorService pool,
220    final User user) throws IOException {
221    return createConnection(conf, pool, user, Collections.emptyMap());
222  }
223
224  /**
225   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
226   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
227   * created from returned connection share zookeeper connection, meta cache, and connections to
228   * region servers and masters. <br>
229   * The caller is responsible for calling {@link Connection#close()} on the returned connection
230   * instance. Typical usage:
231   *
232   * <pre>
233   * Connection connection = ConnectionFactory.createConnection(conf);
234   * Table table = connection.getTable(TableName.valueOf("table1"));
235   * try {
236   *   table.get(...);
237   *   ...
238   * } finally {
239   *   table.close();
240   *   connection.close();
241   * }
242   * </pre>
243   *
244   * @param conf                 configuration
245   * @param user                 the user the connection is for
246   * @param pool                 the thread pool to use for batch operations
247   * @param connectionAttributes attributes to be sent along to server during connection establish
248   * @return Connection object for <code>conf</code>
249   */
250  public static Connection createConnection(Configuration conf, ExecutorService pool,
251    final User user, Map<String, byte[]> connectionAttributes) throws IOException {
252    Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
253      ConnectionOverAsyncConnection.class, Connection.class);
254    if (clazz != ConnectionOverAsyncConnection.class) {
255      try {
256        // Default HCM#HCI is not accessible; make it so before invoking.
257        Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
258          ExecutorService.class, User.class, Map.class);
259        constructor.setAccessible(true);
260        return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor
261          .newInstance(conf, pool, user, connectionAttributes));
262      } catch (Exception e) {
263        throw new IOException(e);
264      }
265    } else {
266      return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes))
267        .toConnection();
268    }
269  }
270
271  /**
272   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
273   * @see #createAsyncConnection(Configuration)
274   * @return AsyncConnection object wrapped by CompletableFuture
275   */
276  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
277    return createAsyncConnection(HBaseConfiguration.create());
278  }
279
280  /**
281   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
282   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
283   * initialize the {@link UserProvider}.
284   * @param conf configuration
285   * @return AsyncConnection object wrapped by CompletableFuture
286   * @see #createAsyncConnection(Configuration, User)
287   * @see UserProvider
288   */
289  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
290    User user;
291    try {
292      user = AuthUtil.loginClient(conf);
293    } catch (IOException e) {
294      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
295      future.completeExceptionally(e);
296      return future;
297    }
298    return createAsyncConnection(conf, user);
299  }
300
301  /**
302   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
303   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
304   * interfaces created from returned connection share zookeeper connection, meta cache, and
305   * connections to region servers and masters.
306   * <p>
307   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
308   * connection instance.
309   * <p>
310   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
311   * as it is thread safe.
312   * @param conf configuration
313   * @param user the user the asynchronous connection is for
314   * @return AsyncConnection object wrapped by CompletableFuture
315   */
316  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
317    final User user) {
318    return createAsyncConnection(conf, user, null);
319  }
320
321  /**
322   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
323   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
324   * interfaces created from returned connection share zookeeper connection, meta cache, and
325   * connections to region servers and masters.
326   * <p>
327   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
328   * connection instance.
329   * <p>
330   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
331   * as it is thread safe.
332   * @param conf                 configuration
333   * @param user                 the user the asynchronous connection is for
334   * @param connectionAttributes attributes to be sent along to server during connection establish
335   * @return AsyncConnection object wrapped by CompletableFuture
336   */
337  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
338    final User user, Map<String, byte[]> connectionAttributes) {
339    return TraceUtil.tracedFuture(() -> {
340      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
341      ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user);
342      addListener(registry.getClusterId(), (clusterId, error) -> {
343        if (error != null) {
344          registry.close();
345          future.completeExceptionally(error);
346          return;
347        }
348        if (clusterId == null) {
349          registry.close();
350          future.completeExceptionally(new IOException("clusterid came back null"));
351          return;
352        }
353        Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
354          AsyncConnectionImpl.class, AsyncConnection.class);
355        try {
356          future.complete(
357            user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
358              .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes)));
359        } catch (Exception e) {
360          registry.close();
361          future.completeExceptionally(e);
362        }
363      });
364      return future;
365    }, "ConnectionFactory.createAsyncConnection");
366  }
367}