001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client;
020
021import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
022
023import java.io.IOException;
024import java.lang.reflect.Constructor;
025import java.util.concurrent.CompletableFuture;
026import java.util.concurrent.ExecutorService;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.security.User;
030import org.apache.hadoop.hbase.security.UserProvider;
031import org.apache.hadoop.hbase.util.ReflectionUtils;
032import org.apache.yetus.audience.InterfaceAudience;
033
034/**
035 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of
036 * the {@link Connection}s to the cluster is the responsibility of the caller. From a
037 * {@link Connection}, {@link Table} implementations are retrieved with
038 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example:
039 *
040 * <pre>
041 * Connection connection = ConnectionFactory.createConnection(config);
042 * Table table = connection.getTable(TableName.valueOf("table1"));
043 * try {
044 *   // Use the table as needed, for a single operation and a single thread
045 * } finally {
046 *   table.close();
047 *   connection.close();
048 * }
049 * </pre>
050 *
051 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator}
052 * implementations.
053 * @see Connection
054 * @since 0.99.0
055 */
056@InterfaceAudience.Public
057public class ConnectionFactory {
058
059  public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl";
060
061  /** No public c.tors */
062  protected ConnectionFactory() {
063  }
064
065  /**
066   * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all
067   * housekeeping for a connection to the cluster. All tables and interfaces created from returned
068   * connection share zookeeper connection, meta cache, and connections to region servers and
069   * masters. <br>
070   * The caller is responsible for calling {@link Connection#close()} on the returned connection
071   * instance. Typical usage:
072   *
073   * <pre>
074   * Connection connection = ConnectionFactory.createConnection();
075   * Table table = connection.getTable(TableName.valueOf("mytable"));
076   * try {
077   *   table.get(...);
078   *   ...
079   * } finally {
080   *   table.close();
081   *   connection.close();
082   * }
083   * </pre>
084   *
085   * @return Connection object for <code>conf</code>
086   */
087  public static Connection createConnection() throws IOException {
088    return createConnection(HBaseConfiguration.create(), null, null);
089  }
090
091  /**
092   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
093   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
094   * created from returned connection share zookeeper connection, meta cache, and connections to
095   * region servers and masters. <br>
096   * The caller is responsible for calling {@link Connection#close()} on the returned connection
097   * instance. Typical usage:
098   *
099   * <pre>
100   * Connection connection = ConnectionFactory.createConnection(conf);
101   * Table table = connection.getTable(TableName.valueOf("mytable"));
102   * try {
103   *   table.get(...);
104   *   ...
105   * } finally {
106   *   table.close();
107   *   connection.close();
108   * }
109   * </pre>
110   *
111   * @param conf configuration
112   * @return Connection object for <code>conf</code>
113   */
114  public static Connection createConnection(Configuration conf) throws IOException {
115    return createConnection(conf, null, null);
116  }
117
118  /**
119   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
120   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
121   * created from returned connection share zookeeper connection, meta cache, and connections to
122   * region servers and masters. <br>
123   * The caller is responsible for calling {@link Connection#close()} on the returned connection
124   * instance. Typical usage:
125   *
126   * <pre>
127   * Connection connection = ConnectionFactory.createConnection(conf);
128   * Table table = connection.getTable(TableName.valueOf("mytable"));
129   * try {
130   *   table.get(...);
131   *   ...
132   * } finally {
133   *   table.close();
134   *   connection.close();
135   * }
136   * </pre>
137   *
138   * @param conf configuration
139   * @param pool the thread pool to use for batch operations
140   * @return Connection object for <code>conf</code>
141   */
142  public static Connection createConnection(Configuration conf, ExecutorService pool)
143      throws IOException {
144    return createConnection(conf, pool, null);
145  }
146
147  /**
148   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
149   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
150   * created from returned connection share zookeeper connection, meta cache, and connections to
151   * region servers and masters. <br>
152   * The caller is responsible for calling {@link Connection#close()} on the returned connection
153   * instance. Typical usage:
154   *
155   * <pre>
156   * Connection connection = ConnectionFactory.createConnection(conf);
157   * Table table = connection.getTable(TableName.valueOf("table1"));
158   * try {
159   *   table.get(...);
160   *   ...
161   * } finally {
162   *   table.close();
163   *   connection.close();
164   * }
165   * </pre>
166   *
167   * @param conf configuration
168   * @param user the user the connection is for
169   * @return Connection object for <code>conf</code>
170   */
171  public static Connection createConnection(Configuration conf, User user) throws IOException {
172    return createConnection(conf, null, user);
173  }
174
175  /**
176   * Create a new Connection instance using the passed <code>conf</code> instance. Connection
177   * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces
178   * created from returned connection share zookeeper connection, meta cache, and connections to
179   * region servers and masters. <br>
180   * The caller is responsible for calling {@link Connection#close()} on the returned connection
181   * instance. Typical usage:
182   *
183   * <pre>
184   * Connection connection = ConnectionFactory.createConnection(conf);
185   * Table table = connection.getTable(TableName.valueOf("table1"));
186   * try {
187   *   table.get(...);
188   *   ...
189   * } finally {
190   *   table.close();
191   *   connection.close();
192   * }
193   * </pre>
194   *
195   * @param conf configuration
196   * @param user the user the connection is for
197   * @param pool the thread pool to use for batch operations
198   * @return Connection object for <code>conf</code>
199   */
200  public static Connection createConnection(Configuration conf, ExecutorService pool, User user)
201      throws IOException {
202    if (user == null) {
203      UserProvider provider = UserProvider.instantiate(conf);
204      user = provider.getCurrent();
205    }
206
207    String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
208      ConnectionImplementation.class.getName());
209    Class<?> clazz;
210    try {
211      clazz = Class.forName(className);
212    } catch (ClassNotFoundException e) {
213      throw new IOException(e);
214    }
215    try {
216      // Default HCM#HCI is not accessible; make it so before invoking.
217      Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class,
218        ExecutorService.class, User.class);
219      constructor.setAccessible(true);
220      return (Connection) constructor.newInstance(conf, pool, user);
221    } catch (Exception e) {
222      throw new IOException(e);
223    }
224  }
225
226  /**
227   * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
228   * @see #createAsyncConnection(Configuration)
229   * @return AsyncConnection object wrapped by CompletableFuture
230   */
231  public static CompletableFuture<AsyncConnection> createAsyncConnection() {
232    return createAsyncConnection(HBaseConfiguration.create());
233  }
234
235  /**
236   * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
237   * User object created by {@link UserProvider}. The given {@code conf} will also be used to
238   * initialize the {@link UserProvider}.
239   * @param conf configuration
240   * @return AsyncConnection object wrapped by CompletableFuture
241   * @see #createAsyncConnection(Configuration, User)
242   * @see UserProvider
243   */
244  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) {
245    User user;
246    try {
247      user = UserProvider.instantiate(conf).getCurrent();
248    } catch (IOException e) {
249      CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
250      future.completeExceptionally(e);
251      return future;
252    }
253    return createAsyncConnection(conf, user);
254  }
255
256  /**
257   * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
258   * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
259   * interfaces created from returned connection share zookeeper connection, meta cache, and
260   * connections to region servers and masters.
261   * <p>
262   * The caller is responsible for calling {@link AsyncConnection#close()} on the returned
263   * connection instance.
264   * <p>
265   * Usually you should only create one AsyncConnection instance in your code and use it everywhere
266   * as it is thread safe.
267   * @param conf configuration
268   * @param user the user the asynchronous connection is for
269   * @return AsyncConnection object wrapped by CompletableFuture
270   * @throws IOException
271   */
272  public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
273      User user) {
274    CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
275    AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
276    addListener(registry.getClusterId(), (clusterId, error) -> {
277      if (error != null) {
278        future.completeExceptionally(error);
279        return;
280      }
281      if (clusterId == null) {
282        future.completeExceptionally(new IOException("clusterid came back null"));
283        return;
284      }
285      Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
286        AsyncConnectionImpl.class, AsyncConnection.class);
287      try {
288        future.complete(ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user));
289      } catch (Exception e) {
290        future.completeExceptionally(e);
291      }
292    });
293    return future;
294  }
295}