001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.lang.reflect.Constructor; 024import java.security.PrivilegedExceptionAction; 025import java.util.Collections; 026import java.util.Map; 027import java.util.concurrent.CompletableFuture; 028import java.util.concurrent.ExecutorService; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.AuthUtil; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.security.UserProvider; 034import org.apache.hadoop.hbase.trace.TraceUtil; 035import org.apache.hadoop.hbase.util.FutureUtils; 036import org.apache.hadoop.hbase.util.ReflectionUtils; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of 041 * the {@link Connection}s to the cluster is the responsibility of the caller. From a 042 * {@link Connection}, {@link Table} implementations are retrieved with 043 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: 044 * 045 * <pre> 046 * Connection connection = ConnectionFactory.createConnection(config); 047 * Table table = connection.getTable(TableName.valueOf("table1")); 048 * try { 049 * // Use the table as needed, for a single operation and a single thread 050 * } finally { 051 * table.close(); 052 * connection.close(); 053 * } 054 * </pre> 055 * 056 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos 057 * credentials if caller has following two configurations set: 058 * <ul> 059 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem 060 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use 061 * </ul> 062 * By this way, caller can directly connect to kerberized cluster without caring login and 063 * credentials renewal logic in application. 064 * 065 * <pre> 066 * </pre> 067 * 068 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator} 069 * implementations. 070 * @see Connection 071 * @since 0.99.0 072 */ 073@InterfaceAudience.Public 074public class ConnectionFactory { 075 076 public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = 077 "hbase.client.async.connection.impl"; 078 079 /** No public c.tors */ 080 protected ConnectionFactory() { 081 } 082 083 /** 084 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 085 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 086 * connection share zookeeper connection, meta cache, and connections to region servers and 087 * masters. <br> 088 * The caller is responsible for calling {@link Connection#close()} on the returned connection 089 * instance. Typical usage: 090 * 091 * <pre> 092 * Connection connection = ConnectionFactory.createConnection(); 093 * Table table = connection.getTable(TableName.valueOf("mytable")); 094 * try { 095 * table.get(...); 096 * ... 097 * } finally { 098 * table.close(); 099 * connection.close(); 100 * } 101 * </pre> 102 * 103 * @return Connection object for <code>conf</code> 104 */ 105 public static Connection createConnection() throws IOException { 106 Configuration conf = HBaseConfiguration.create(); 107 return createConnection(conf, null, AuthUtil.loginClient(conf)); 108 } 109 110 /** 111 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 112 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 113 * created from returned connection share zookeeper connection, meta cache, and connections to 114 * region servers and masters. <br> 115 * The caller is responsible for calling {@link Connection#close()} on the returned connection 116 * instance. Typical usage: 117 * 118 * <pre> 119 * Connection connection = ConnectionFactory.createConnection(conf); 120 * Table table = connection.getTable(TableName.valueOf("mytable")); 121 * try { 122 * table.get(...); 123 * ... 124 * } finally { 125 * table.close(); 126 * connection.close(); 127 * } 128 * </pre> 129 * 130 * @param conf configuration 131 * @return Connection object for <code>conf</code> 132 */ 133 public static Connection createConnection(Configuration conf) throws IOException { 134 return createConnection(conf, null, AuthUtil.loginClient(conf)); 135 } 136 137 /** 138 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 139 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 140 * created from returned connection share zookeeper connection, meta cache, and connections to 141 * region servers and masters. <br> 142 * The caller is responsible for calling {@link Connection#close()} on the returned connection 143 * instance. Typical usage: 144 * 145 * <pre> 146 * Connection connection = ConnectionFactory.createConnection(conf); 147 * Table table = connection.getTable(TableName.valueOf("mytable")); 148 * try { 149 * table.get(...); 150 * ... 151 * } finally { 152 * table.close(); 153 * connection.close(); 154 * } 155 * </pre> 156 * 157 * @param conf configuration 158 * @param pool the thread pool to use for batch operations 159 * @return Connection object for <code>conf</code> 160 */ 161 public static Connection createConnection(Configuration conf, ExecutorService pool) 162 throws IOException { 163 return createConnection(conf, pool, AuthUtil.loginClient(conf)); 164 } 165 166 /** 167 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 168 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 169 * created from returned connection share zookeeper connection, meta cache, and connections to 170 * region servers and masters. <br> 171 * The caller is responsible for calling {@link Connection#close()} on the returned connection 172 * instance. Typical usage: 173 * 174 * <pre> 175 * Connection connection = ConnectionFactory.createConnection(conf); 176 * Table table = connection.getTable(TableName.valueOf("table1")); 177 * try { 178 * table.get(...); 179 * ... 180 * } finally { 181 * table.close(); 182 * connection.close(); 183 * } 184 * </pre> 185 * 186 * @param conf configuration 187 * @param user the user the connection is for 188 * @return Connection object for <code>conf</code> 189 */ 190 public static Connection createConnection(Configuration conf, User user) throws IOException { 191 return createConnection(conf, null, user); 192 } 193 194 /** 195 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 196 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 197 * created from returned connection share zookeeper connection, meta cache, and connections to 198 * region servers and masters. <br> 199 * The caller is responsible for calling {@link Connection#close()} on the returned connection 200 * instance. Typical usage: 201 * 202 * <pre> 203 * Connection connection = ConnectionFactory.createConnection(conf); 204 * Table table = connection.getTable(TableName.valueOf("table1")); 205 * try { 206 * table.get(...); 207 * ... 208 * } finally { 209 * table.close(); 210 * connection.close(); 211 * } 212 * </pre> 213 * 214 * @param conf configuration 215 * @param user the user the connection is for 216 * @param pool the thread pool to use for batch operations 217 * @return Connection object for <code>conf</code> 218 */ 219 public static Connection createConnection(Configuration conf, ExecutorService pool, 220 final User user) throws IOException { 221 return createConnection(conf, pool, user, Collections.emptyMap()); 222 } 223 224 /** 225 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 226 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 227 * created from returned connection share zookeeper connection, meta cache, and connections to 228 * region servers and masters. <br> 229 * The caller is responsible for calling {@link Connection#close()} on the returned connection 230 * instance. Typical usage: 231 * 232 * <pre> 233 * Connection connection = ConnectionFactory.createConnection(conf); 234 * Table table = connection.getTable(TableName.valueOf("table1")); 235 * try { 236 * table.get(...); 237 * ... 238 * } finally { 239 * table.close(); 240 * connection.close(); 241 * } 242 * </pre> 243 * 244 * @param conf configuration 245 * @param user the user the connection is for 246 * @param pool the thread pool to use for batch operations 247 * @param connectionAttributes attributes to be sent along to server during connection establish 248 * @return Connection object for <code>conf</code> 249 */ 250 public static Connection createConnection(Configuration conf, ExecutorService pool, 251 final User user, Map<String, byte[]> connectionAttributes) throws IOException { 252 Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 253 ConnectionOverAsyncConnection.class, Connection.class); 254 if (clazz != ConnectionOverAsyncConnection.class) { 255 try { 256 // Default HCM#HCI is not accessible; make it so before invoking. 257 Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, 258 ExecutorService.class, User.class, Map.class); 259 constructor.setAccessible(true); 260 return user.runAs((PrivilegedExceptionAction<Connection>) () -> (Connection) constructor 261 .newInstance(conf, pool, user, connectionAttributes)); 262 } catch (Exception e) { 263 throw new IOException(e); 264 } 265 } else { 266 return FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)) 267 .toConnection(); 268 } 269 } 270 271 /** 272 * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration. 273 * @see #createAsyncConnection(Configuration) 274 * @return AsyncConnection object wrapped by CompletableFuture 275 */ 276 public static CompletableFuture<AsyncConnection> createAsyncConnection() { 277 return createAsyncConnection(HBaseConfiguration.create()); 278 } 279 280 /** 281 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a 282 * User object created by {@link UserProvider}. The given {@code conf} will also be used to 283 * initialize the {@link UserProvider}. 284 * @param conf configuration 285 * @return AsyncConnection object wrapped by CompletableFuture 286 * @see #createAsyncConnection(Configuration, User) 287 * @see UserProvider 288 */ 289 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { 290 User user; 291 try { 292 user = AuthUtil.loginClient(conf); 293 } catch (IOException e) { 294 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 295 future.completeExceptionally(e); 296 return future; 297 } 298 return createAsyncConnection(conf, user); 299 } 300 301 /** 302 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 303 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 304 * interfaces created from returned connection share zookeeper connection, meta cache, and 305 * connections to region servers and masters. 306 * <p> 307 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 308 * connection instance. 309 * <p> 310 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 311 * as it is thread safe. 312 * @param conf configuration 313 * @param user the user the asynchronous connection is for 314 * @return AsyncConnection object wrapped by CompletableFuture 315 */ 316 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 317 final User user) { 318 return createAsyncConnection(conf, user, null); 319 } 320 321 /** 322 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 323 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 324 * interfaces created from returned connection share zookeeper connection, meta cache, and 325 * connections to region servers and masters. 326 * <p> 327 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 328 * connection instance. 329 * <p> 330 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 331 * as it is thread safe. 332 * @param conf configuration 333 * @param user the user the asynchronous connection is for 334 * @param connectionAttributes attributes to be sent along to server during connection establish 335 * @return AsyncConnection object wrapped by CompletableFuture 336 */ 337 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 338 final User user, Map<String, byte[]> connectionAttributes) { 339 return TraceUtil.tracedFuture(() -> { 340 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 341 ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user); 342 addListener(registry.getClusterId(), (clusterId, error) -> { 343 if (error != null) { 344 registry.close(); 345 future.completeExceptionally(error); 346 return; 347 } 348 if (clusterId == null) { 349 registry.close(); 350 future.completeExceptionally(new IOException("clusterid came back null")); 351 return; 352 } 353 Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 354 AsyncConnectionImpl.class, AsyncConnection.class); 355 try { 356 future.complete( 357 user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils 358 .newInstance(clazz, conf, registry, clusterId, null, user, connectionAttributes))); 359 } catch (Exception e) { 360 registry.close(); 361 future.completeExceptionally(e); 362 } 363 }); 364 return future; 365 }, "ConnectionFactory.createAsyncConnection"); 366 } 367}