001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import java.io.IOException; 024import java.lang.reflect.Constructor; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.ExecutorService; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.hadoop.hbase.security.UserProvider; 031import org.apache.hadoop.hbase.util.ReflectionUtils; 032import org.apache.yetus.audience.InterfaceAudience; 033 034/** 035 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of 036 * the {@link Connection}s to the cluster is the responsibility of the caller. From a 037 * {@link Connection}, {@link Table} implementations are retrieved with 038 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: 039 * 040 * <pre> 041 * Connection connection = ConnectionFactory.createConnection(config); 042 * Table table = connection.getTable(TableName.valueOf("table1")); 043 * try { 044 * // Use the table as needed, for a single operation and a single thread 045 * } finally { 046 * table.close(); 047 * connection.close(); 048 * } 049 * </pre> 050 * 051 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator} 052 * implementations. 053 * @see Connection 054 * @since 0.99.0 055 */ 056@InterfaceAudience.Public 057public class ConnectionFactory { 058 059 public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl"; 060 061 /** No public c.tors */ 062 protected ConnectionFactory() { 063 } 064 065 /** 066 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 067 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 068 * connection share zookeeper connection, meta cache, and connections to region servers and 069 * masters. <br> 070 * The caller is responsible for calling {@link Connection#close()} on the returned connection 071 * instance. Typical usage: 072 * 073 * <pre> 074 * Connection connection = ConnectionFactory.createConnection(); 075 * Table table = connection.getTable(TableName.valueOf("mytable")); 076 * try { 077 * table.get(...); 078 * ... 079 * } finally { 080 * table.close(); 081 * connection.close(); 082 * } 083 * </pre> 084 * 085 * @return Connection object for <code>conf</code> 086 */ 087 public static Connection createConnection() throws IOException { 088 return createConnection(HBaseConfiguration.create(), null, null); 089 } 090 091 /** 092 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 093 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 094 * created from returned connection share zookeeper connection, meta cache, and connections to 095 * region servers and masters. <br> 096 * The caller is responsible for calling {@link Connection#close()} on the returned connection 097 * instance. Typical usage: 098 * 099 * <pre> 100 * Connection connection = ConnectionFactory.createConnection(conf); 101 * Table table = connection.getTable(TableName.valueOf("mytable")); 102 * try { 103 * table.get(...); 104 * ... 105 * } finally { 106 * table.close(); 107 * connection.close(); 108 * } 109 * </pre> 110 * 111 * @param conf configuration 112 * @return Connection object for <code>conf</code> 113 */ 114 public static Connection createConnection(Configuration conf) throws IOException { 115 return createConnection(conf, null, null); 116 } 117 118 /** 119 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 120 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 121 * created from returned connection share zookeeper connection, meta cache, and connections to 122 * region servers and masters. <br> 123 * The caller is responsible for calling {@link Connection#close()} on the returned connection 124 * instance. Typical usage: 125 * 126 * <pre> 127 * Connection connection = ConnectionFactory.createConnection(conf); 128 * Table table = connection.getTable(TableName.valueOf("mytable")); 129 * try { 130 * table.get(...); 131 * ... 132 * } finally { 133 * table.close(); 134 * connection.close(); 135 * } 136 * </pre> 137 * 138 * @param conf configuration 139 * @param pool the thread pool to use for batch operations 140 * @return Connection object for <code>conf</code> 141 */ 142 public static Connection createConnection(Configuration conf, ExecutorService pool) 143 throws IOException { 144 return createConnection(conf, pool, null); 145 } 146 147 /** 148 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 149 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 150 * created from returned connection share zookeeper connection, meta cache, and connections to 151 * region servers and masters. <br> 152 * The caller is responsible for calling {@link Connection#close()} on the returned connection 153 * instance. Typical usage: 154 * 155 * <pre> 156 * Connection connection = ConnectionFactory.createConnection(conf); 157 * Table table = connection.getTable(TableName.valueOf("table1")); 158 * try { 159 * table.get(...); 160 * ... 161 * } finally { 162 * table.close(); 163 * connection.close(); 164 * } 165 * </pre> 166 * 167 * @param conf configuration 168 * @param user the user the connection is for 169 * @return Connection object for <code>conf</code> 170 */ 171 public static Connection createConnection(Configuration conf, User user) throws IOException { 172 return createConnection(conf, null, user); 173 } 174 175 /** 176 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 177 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 178 * created from returned connection share zookeeper connection, meta cache, and connections to 179 * region servers and masters. <br> 180 * The caller is responsible for calling {@link Connection#close()} on the returned connection 181 * instance. Typical usage: 182 * 183 * <pre> 184 * Connection connection = ConnectionFactory.createConnection(conf); 185 * Table table = connection.getTable(TableName.valueOf("table1")); 186 * try { 187 * table.get(...); 188 * ... 189 * } finally { 190 * table.close(); 191 * connection.close(); 192 * } 193 * </pre> 194 * 195 * @param conf configuration 196 * @param user the user the connection is for 197 * @param pool the thread pool to use for batch operations 198 * @return Connection object for <code>conf</code> 199 */ 200 public static Connection createConnection(Configuration conf, ExecutorService pool, User user) 201 throws IOException { 202 if (user == null) { 203 UserProvider provider = UserProvider.instantiate(conf); 204 user = provider.getCurrent(); 205 } 206 207 String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 208 ConnectionImplementation.class.getName()); 209 Class<?> clazz; 210 try { 211 clazz = Class.forName(className); 212 } catch (ClassNotFoundException e) { 213 throw new IOException(e); 214 } 215 try { 216 // Default HCM#HCI is not accessible; make it so before invoking. 217 Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, 218 ExecutorService.class, User.class); 219 constructor.setAccessible(true); 220 return (Connection) constructor.newInstance(conf, pool, user); 221 } catch (Exception e) { 222 throw new IOException(e); 223 } 224 } 225 226 /** 227 * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration. 228 * @see #createAsyncConnection(Configuration) 229 * @return AsyncConnection object wrapped by CompletableFuture 230 */ 231 public static CompletableFuture<AsyncConnection> createAsyncConnection() { 232 return createAsyncConnection(HBaseConfiguration.create()); 233 } 234 235 /** 236 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a 237 * User object created by {@link UserProvider}. The given {@code conf} will also be used to 238 * initialize the {@link UserProvider}. 239 * @param conf configuration 240 * @return AsyncConnection object wrapped by CompletableFuture 241 * @see #createAsyncConnection(Configuration, User) 242 * @see UserProvider 243 */ 244 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { 245 User user; 246 try { 247 user = UserProvider.instantiate(conf).getCurrent(); 248 } catch (IOException e) { 249 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 250 future.completeExceptionally(e); 251 return future; 252 } 253 return createAsyncConnection(conf, user); 254 } 255 256 /** 257 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 258 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 259 * interfaces created from returned connection share zookeeper connection, meta cache, and 260 * connections to region servers and masters. 261 * <p> 262 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 263 * connection instance. 264 * <p> 265 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 266 * as it is thread safe. 267 * @param conf configuration 268 * @param user the user the asynchronous connection is for 269 * @return AsyncConnection object wrapped by CompletableFuture 270 * @throws IOException 271 */ 272 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 273 User user) { 274 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 275 AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf); 276 addListener(registry.getClusterId(), (clusterId, error) -> { 277 if (error != null) { 278 future.completeExceptionally(error); 279 return; 280 } 281 if (clusterId == null) { 282 future.completeExceptionally(new IOException("clusterid came back null")); 283 return; 284 } 285 Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 286 AsyncConnectionImpl.class, AsyncConnection.class); 287 try { 288 future.complete(ReflectionUtils.newInstance(clazz, conf, registry, clusterId, user)); 289 } catch (Exception e) { 290 future.completeExceptionally(e); 291 } 292 }); 293 return future; 294 } 295}