001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.lang.reflect.Constructor; 024import java.security.PrivilegedExceptionAction; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.ExecutorService; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.AuthUtil; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.security.UserProvider; 032import org.apache.hadoop.hbase.trace.TraceUtil; 033import org.apache.hadoop.hbase.util.FutureUtils; 034import org.apache.hadoop.hbase.util.ReflectionUtils; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of 039 * the {@link Connection}s to the cluster is the responsibility of the caller. From a 040 * {@link Connection}, {@link Table} implementations are retrieved with 041 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: 042 * 043 * <pre> 044 * Connection connection = ConnectionFactory.createConnection(config); 045 * Table table = connection.getTable(TableName.valueOf("table1")); 046 * try { 047 * // Use the table as needed, for a single operation and a single thread 048 * } finally { 049 * table.close(); 050 * connection.close(); 051 * } 052 * </pre> 053 * 054 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos 055 * credentials if caller has following two configurations set: 056 * <ul> 057 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem 058 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use 059 * </ul> 060 * By this way, caller can directly connect to kerberized cluster without caring login and 061 * credentials renewal logic in application. 062 * 063 * <pre> 064 * </pre> 065 * 066 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator} 067 * implementations. 068 * @see Connection 069 * @since 0.99.0 070 */ 071@InterfaceAudience.Public 072public class ConnectionFactory { 073 074 public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = 075 "hbase.client.async.connection.impl"; 076 077 /** No public c.tors */ 078 protected ConnectionFactory() { 079 } 080 081 /** 082 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 083 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 084 * connection share zookeeper connection, meta cache, and connections to region servers and 085 * masters. <br> 086 * The caller is responsible for calling {@link Connection#close()} on the returned connection 087 * instance. Typical usage: 088 * 089 * <pre> 090 * Connection connection = ConnectionFactory.createConnection(); 091 * Table table = connection.getTable(TableName.valueOf("mytable")); 092 * try { 093 * table.get(...); 094 * ... 095 * } finally { 096 * table.close(); 097 * connection.close(); 098 * } 099 * </pre> 100 * 101 * @return Connection object for <code>conf</code> 102 */ 103 public static Connection createConnection() throws IOException { 104 Configuration conf = HBaseConfiguration.create(); 105 return createConnection(conf, null, AuthUtil.loginClient(conf)); 106 } 107 108 /** 109 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 110 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 111 * created from returned connection share zookeeper connection, meta cache, and connections to 112 * region servers and masters. <br> 113 * The caller is responsible for calling {@link Connection#close()} on the returned connection 114 * instance. Typical usage: 115 * 116 * <pre> 117 * Connection connection = ConnectionFactory.createConnection(conf); 118 * Table table = connection.getTable(TableName.valueOf("mytable")); 119 * try { 120 * table.get(...); 121 * ... 122 * } finally { 123 * table.close(); 124 * connection.close(); 125 * } 126 * </pre> 127 * 128 * @param conf configuration 129 * @return Connection object for <code>conf</code> 130 */ 131 public static Connection createConnection(Configuration conf) throws IOException { 132 return createConnection(conf, null, AuthUtil.loginClient(conf)); 133 } 134 135 /** 136 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 137 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 138 * created from returned connection share zookeeper connection, meta cache, and connections to 139 * region servers and masters. <br> 140 * The caller is responsible for calling {@link Connection#close()} on the returned connection 141 * instance. Typical usage: 142 * 143 * <pre> 144 * Connection connection = ConnectionFactory.createConnection(conf); 145 * Table table = connection.getTable(TableName.valueOf("mytable")); 146 * try { 147 * table.get(...); 148 * ... 149 * } finally { 150 * table.close(); 151 * connection.close(); 152 * } 153 * </pre> 154 * 155 * @param conf configuration 156 * @param pool the thread pool to use for batch operations 157 * @return Connection object for <code>conf</code> 158 */ 159 public static Connection createConnection(Configuration conf, ExecutorService pool) 160 throws IOException { 161 return createConnection(conf, pool, AuthUtil.loginClient(conf)); 162 } 163 164 /** 165 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 166 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 167 * created from returned connection share zookeeper connection, meta cache, and connections to 168 * region servers and masters. <br> 169 * The caller is responsible for calling {@link Connection#close()} on the returned connection 170 * instance. Typical usage: 171 * 172 * <pre> 173 * Connection connection = ConnectionFactory.createConnection(conf); 174 * Table table = connection.getTable(TableName.valueOf("table1")); 175 * try { 176 * table.get(...); 177 * ... 178 * } finally { 179 * table.close(); 180 * connection.close(); 181 * } 182 * </pre> 183 * 184 * @param conf configuration 185 * @param user the user the connection is for 186 * @return Connection object for <code>conf</code> 187 */ 188 public static Connection createConnection(Configuration conf, User user) throws IOException { 189 return createConnection(conf, null, user); 190 } 191 192 /** 193 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 194 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 195 * created from returned connection share zookeeper connection, meta cache, and connections to 196 * region servers and masters. <br> 197 * The caller is responsible for calling {@link Connection#close()} on the returned connection 198 * instance. Typical usage: 199 * 200 * <pre> 201 * Connection connection = ConnectionFactory.createConnection(conf); 202 * Table table = connection.getTable(TableName.valueOf("table1")); 203 * try { 204 * table.get(...); 205 * ... 206 * } finally { 207 * table.close(); 208 * connection.close(); 209 * } 210 * </pre> 211 * 212 * @param conf configuration 213 * @param user the user the connection is for 214 * @param pool the thread pool to use for batch operations 215 * @return Connection object for <code>conf</code> 216 */ 217 public static Connection createConnection(Configuration conf, ExecutorService pool, 218 final User user) throws IOException { 219 Class<?> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 220 ConnectionOverAsyncConnection.class, Connection.class); 221 if (clazz != ConnectionOverAsyncConnection.class) { 222 try { 223 // Default HCM#HCI is not accessible; make it so before invoking. 224 Constructor<?> constructor = 225 clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class); 226 constructor.setAccessible(true); 227 return user.runAs((PrivilegedExceptionAction< 228 Connection>) () -> (Connection) constructor.newInstance(conf, pool, user)); 229 } catch (Exception e) { 230 throw new IOException(e); 231 } 232 } else { 233 return FutureUtils.get(createAsyncConnection(conf, user)).toConnection(); 234 } 235 } 236 237 /** 238 * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration. 239 * @see #createAsyncConnection(Configuration) 240 * @return AsyncConnection object wrapped by CompletableFuture 241 */ 242 public static CompletableFuture<AsyncConnection> createAsyncConnection() { 243 return createAsyncConnection(HBaseConfiguration.create()); 244 } 245 246 /** 247 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a 248 * User object created by {@link UserProvider}. The given {@code conf} will also be used to 249 * initialize the {@link UserProvider}. 250 * @param conf configuration 251 * @return AsyncConnection object wrapped by CompletableFuture 252 * @see #createAsyncConnection(Configuration, User) 253 * @see UserProvider 254 */ 255 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { 256 User user; 257 try { 258 user = AuthUtil.loginClient(conf); 259 } catch (IOException e) { 260 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 261 future.completeExceptionally(e); 262 return future; 263 } 264 return createAsyncConnection(conf, user); 265 } 266 267 /** 268 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 269 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 270 * interfaces created from returned connection share zookeeper connection, meta cache, and 271 * connections to region servers and masters. 272 * <p> 273 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 274 * connection instance. 275 * <p> 276 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 277 * as it is thread safe. 278 * @param conf configuration 279 * @param user the user the asynchronous connection is for 280 * @return AsyncConnection object wrapped by CompletableFuture 281 */ 282 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 283 final User user) { 284 return TraceUtil.tracedFuture(() -> { 285 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 286 ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); 287 addListener(registry.getClusterId(), (clusterId, error) -> { 288 if (error != null) { 289 registry.close(); 290 future.completeExceptionally(error); 291 return; 292 } 293 if (clusterId == null) { 294 registry.close(); 295 future.completeExceptionally(new IOException("clusterid came back null")); 296 return; 297 } 298 Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 299 AsyncConnectionImpl.class, AsyncConnection.class); 300 try { 301 future.complete( 302 user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils 303 .newInstance(clazz, conf, registry, clusterId, null, user))); 304 } catch (Exception e) { 305 registry.close(); 306 future.completeExceptionally(e); 307 } 308 }); 309 return future; 310 }, "ConnectionFactory.createAsyncConnection"); 311 } 312}