001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.client; 020 021import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 022 023import java.io.IOException; 024import java.lang.reflect.Constructor; 025import java.security.PrivilegedExceptionAction; 026import java.util.concurrent.CompletableFuture; 027import java.util.concurrent.ExecutorService; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.AuthUtil; 030import org.apache.hadoop.hbase.HBaseConfiguration; 031import org.apache.hadoop.hbase.security.User; 032import org.apache.hadoop.hbase.security.UserProvider; 033import org.apache.hadoop.hbase.util.ReflectionUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of 038 * the {@link Connection}s to the cluster is the responsibility of the caller. From a 039 * {@link Connection}, {@link Table} implementations are retrieved with 040 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: 041 * 042 * <pre> 043 * Connection connection = ConnectionFactory.createConnection(config); 044 * Table table = connection.getTable(TableName.valueOf("table1")); 045 * try { 046 * // Use the table as needed, for a single operation and a single thread 047 * } finally { 048 * table.close(); 049 * connection.close(); 050 * } 051 * </pre> 052 * 053 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos 054 * credentials if caller has following two configurations set: 055 * <ul> 056 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem 057 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use 058 * </ul> 059 * By this way, caller can directly connect to kerberized cluster without caring login and 060 * credentials renewal logic in application. 061 * <pre> 062 * </pre> 063 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator} 064 * implementations. 065 * @see Connection 066 * @since 0.99.0 067 */ 068@InterfaceAudience.Public 069public class ConnectionFactory { 070 071 public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = "hbase.client.async.connection.impl"; 072 073 /** No public c.tors */ 074 protected ConnectionFactory() { 075 } 076 077 /** 078 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 079 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 080 * connection share zookeeper connection, meta cache, and connections to region servers and 081 * masters. <br> 082 * The caller is responsible for calling {@link Connection#close()} on the returned connection 083 * instance. Typical usage: 084 * 085 * <pre> 086 * Connection connection = ConnectionFactory.createConnection(); 087 * Table table = connection.getTable(TableName.valueOf("mytable")); 088 * try { 089 * table.get(...); 090 * ... 091 * } finally { 092 * table.close(); 093 * connection.close(); 094 * } 095 * </pre> 096 * 097 * @return Connection object for <code>conf</code> 098 */ 099 public static Connection createConnection() throws IOException { 100 Configuration conf = HBaseConfiguration.create(); 101 return createConnection(conf, null, AuthUtil.loginClient(conf)); 102 } 103 104 /** 105 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 106 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 107 * created from returned connection share zookeeper connection, meta cache, and connections to 108 * region servers and masters. <br> 109 * The caller is responsible for calling {@link Connection#close()} on the returned connection 110 * instance. Typical usage: 111 * 112 * <pre> 113 * Connection connection = ConnectionFactory.createConnection(conf); 114 * Table table = connection.getTable(TableName.valueOf("mytable")); 115 * try { 116 * table.get(...); 117 * ... 118 * } finally { 119 * table.close(); 120 * connection.close(); 121 * } 122 * </pre> 123 * 124 * @param conf configuration 125 * @return Connection object for <code>conf</code> 126 */ 127 public static Connection createConnection(Configuration conf) throws IOException { 128 return createConnection(conf, null, AuthUtil.loginClient(conf)); 129 } 130 131 /** 132 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 133 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 134 * created from returned connection share zookeeper connection, meta cache, and connections to 135 * region servers and masters. <br> 136 * The caller is responsible for calling {@link Connection#close()} on the returned connection 137 * instance. Typical usage: 138 * 139 * <pre> 140 * Connection connection = ConnectionFactory.createConnection(conf); 141 * Table table = connection.getTable(TableName.valueOf("mytable")); 142 * try { 143 * table.get(...); 144 * ... 145 * } finally { 146 * table.close(); 147 * connection.close(); 148 * } 149 * </pre> 150 * 151 * @param conf configuration 152 * @param pool the thread pool to use for batch operations 153 * @return Connection object for <code>conf</code> 154 */ 155 public static Connection createConnection(Configuration conf, ExecutorService pool) 156 throws IOException { 157 return createConnection(conf, pool, AuthUtil.loginClient(conf)); 158 } 159 160 /** 161 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 162 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 163 * created from returned connection share zookeeper connection, meta cache, and connections to 164 * region servers and masters. <br> 165 * The caller is responsible for calling {@link Connection#close()} on the returned connection 166 * instance. Typical usage: 167 * 168 * <pre> 169 * Connection connection = ConnectionFactory.createConnection(conf); 170 * Table table = connection.getTable(TableName.valueOf("table1")); 171 * try { 172 * table.get(...); 173 * ... 174 * } finally { 175 * table.close(); 176 * connection.close(); 177 * } 178 * </pre> 179 * 180 * @param conf configuration 181 * @param user the user the connection is for 182 * @return Connection object for <code>conf</code> 183 */ 184 public static Connection createConnection(Configuration conf, User user) throws IOException { 185 return createConnection(conf, null, user); 186 } 187 188 /** 189 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 190 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 191 * created from returned connection share zookeeper connection, meta cache, and connections to 192 * region servers and masters. <br> 193 * The caller is responsible for calling {@link Connection#close()} on the returned connection 194 * instance. Typical usage: 195 * 196 * <pre> 197 * Connection connection = ConnectionFactory.createConnection(conf); 198 * Table table = connection.getTable(TableName.valueOf("table1")); 199 * try { 200 * table.get(...); 201 * ... 202 * } finally { 203 * table.close(); 204 * connection.close(); 205 * } 206 * </pre> 207 * 208 * @param conf configuration 209 * @param user the user the connection is for 210 * @param pool the thread pool to use for batch operations 211 * @return Connection object for <code>conf</code> 212 */ 213 public static Connection createConnection(Configuration conf, ExecutorService pool, 214 final User user) throws IOException { 215 String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 216 ConnectionImplementation.class.getName()); 217 Class<?> clazz; 218 try { 219 clazz = Class.forName(className); 220 } catch (ClassNotFoundException e) { 221 throw new IOException(e); 222 } 223 try { 224 // Default HCM#HCI is not accessible; make it so before invoking. 225 Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, 226 ExecutorService.class, User.class); 227 constructor.setAccessible(true); 228 return user.runAs( 229 (PrivilegedExceptionAction<Connection>)() -> 230 (Connection) constructor.newInstance(conf, pool, user)); 231 } catch (Exception e) { 232 throw new IOException(e); 233 } 234 } 235 236 /** 237 * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration. 238 * @see #createAsyncConnection(Configuration) 239 * @return AsyncConnection object wrapped by CompletableFuture 240 */ 241 public static CompletableFuture<AsyncConnection> createAsyncConnection() { 242 return createAsyncConnection(HBaseConfiguration.create()); 243 } 244 245 /** 246 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a 247 * User object created by {@link UserProvider}. The given {@code conf} will also be used to 248 * initialize the {@link UserProvider}. 249 * @param conf configuration 250 * @return AsyncConnection object wrapped by CompletableFuture 251 * @see #createAsyncConnection(Configuration, User) 252 * @see UserProvider 253 */ 254 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { 255 User user; 256 try { 257 user = AuthUtil.loginClient(conf); 258 } catch (IOException e) { 259 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 260 future.completeExceptionally(e); 261 return future; 262 } 263 return createAsyncConnection(conf, user); 264 } 265 266 /** 267 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 268 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 269 * interfaces created from returned connection share zookeeper connection, meta cache, and 270 * connections to region servers and masters. 271 * <p> 272 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 273 * connection instance. 274 * <p> 275 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 276 * as it is thread safe. 277 * @param conf configuration 278 * @param user the user the asynchronous connection is for 279 * @return AsyncConnection object wrapped by CompletableFuture 280 */ 281 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 282 final User user) { 283 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 284 ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); 285 addListener(registry.getClusterId(), (clusterId, error) -> { 286 if (error != null) { 287 registry.close(); 288 future.completeExceptionally(error); 289 return; 290 } 291 if (clusterId == null) { 292 registry.close(); 293 future.completeExceptionally(new IOException("clusterid came back null")); 294 return; 295 } 296 Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 297 AsyncConnectionImpl.class, AsyncConnection.class); 298 try { 299 future.complete( 300 user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils 301 .newInstance(clazz, conf, registry, clusterId, user))); 302 } catch (Exception e) { 303 registry.close(); 304 future.completeExceptionally(e); 305 } 306 }); 307 return future; 308 } 309}