001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.util.FutureUtils.addListener; 021 022import java.io.IOException; 023import java.lang.reflect.Constructor; 024import java.security.PrivilegedExceptionAction; 025import java.util.concurrent.CompletableFuture; 026import java.util.concurrent.ExecutorService; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.AuthUtil; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.security.UserProvider; 032import org.apache.hadoop.hbase.trace.TraceUtil; 033import org.apache.hadoop.hbase.util.ReflectionUtils; 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * A non-instantiable class that manages creation of {@link Connection}s. Managing the lifecycle of 038 * the {@link Connection}s to the cluster is the responsibility of the caller. From a 039 * {@link Connection}, {@link Table} implementations are retrieved with 040 * {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: 041 * 042 * <pre> 043 * Connection connection = ConnectionFactory.createConnection(config); 044 * Table table = connection.getTable(TableName.valueOf("table1")); 045 * try { 046 * // Use the table as needed, for a single operation and a single thread 047 * } finally { 048 * table.close(); 049 * connection.close(); 050 * } 051 * </pre> 052 * 053 * Since 2.2.0, Connection created by ConnectionFactory can contain user-specified kerberos 054 * credentials if caller has following two configurations set: 055 * <ul> 056 * <li>hbase.client.keytab.file, points to a valid keytab on the local filesystem 057 * <li>hbase.client.kerberos.principal, gives the Kerberos principal to use 058 * </ul> 059 * By this way, caller can directly connect to kerberized cluster without caring login and 060 * credentials renewal logic in application. 061 * 062 * <pre> 063 * </pre> 064 * 065 * Similarly, {@link Connection} also returns {@link Admin} and {@link RegionLocator} 066 * implementations. 067 * @see Connection 068 * @since 0.99.0 069 */ 070@InterfaceAudience.Public 071public class ConnectionFactory { 072 073 public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL = 074 "hbase.client.async.connection.impl"; 075 076 /** No public c.tors */ 077 protected ConnectionFactory() { 078 } 079 080 /** 081 * Create a new Connection instance using default HBaseConfiguration. Connection encapsulates all 082 * housekeeping for a connection to the cluster. All tables and interfaces created from returned 083 * connection share zookeeper connection, meta cache, and connections to region servers and 084 * masters. <br> 085 * The caller is responsible for calling {@link Connection#close()} on the returned connection 086 * instance. Typical usage: 087 * 088 * <pre> 089 * Connection connection = ConnectionFactory.createConnection(); 090 * Table table = connection.getTable(TableName.valueOf("mytable")); 091 * try { 092 * table.get(...); 093 * ... 094 * } finally { 095 * table.close(); 096 * connection.close(); 097 * } 098 * </pre> 099 * 100 * @return Connection object for <code>conf</code> 101 */ 102 public static Connection createConnection() throws IOException { 103 Configuration conf = HBaseConfiguration.create(); 104 return createConnection(conf, null, AuthUtil.loginClient(conf)); 105 } 106 107 /** 108 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 109 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 110 * created from returned connection share zookeeper connection, meta cache, and connections to 111 * region servers and masters. <br> 112 * The caller is responsible for calling {@link Connection#close()} on the returned connection 113 * instance. Typical usage: 114 * 115 * <pre> 116 * Connection connection = ConnectionFactory.createConnection(conf); 117 * Table table = connection.getTable(TableName.valueOf("mytable")); 118 * try { 119 * table.get(...); 120 * ... 121 * } finally { 122 * table.close(); 123 * connection.close(); 124 * } 125 * </pre> 126 * 127 * @param conf configuration 128 * @return Connection object for <code>conf</code> 129 */ 130 public static Connection createConnection(Configuration conf) throws IOException { 131 return createConnection(conf, null, AuthUtil.loginClient(conf)); 132 } 133 134 /** 135 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 136 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 137 * created from returned connection share zookeeper connection, meta cache, and connections to 138 * region servers and masters. <br> 139 * The caller is responsible for calling {@link Connection#close()} on the returned connection 140 * instance. Typical usage: 141 * 142 * <pre> 143 * Connection connection = ConnectionFactory.createConnection(conf); 144 * Table table = connection.getTable(TableName.valueOf("mytable")); 145 * try { 146 * table.get(...); 147 * ... 148 * } finally { 149 * table.close(); 150 * connection.close(); 151 * } 152 * </pre> 153 * 154 * @param conf configuration 155 * @param pool the thread pool to use for batch operations 156 * @return Connection object for <code>conf</code> 157 */ 158 public static Connection createConnection(Configuration conf, ExecutorService pool) 159 throws IOException { 160 return createConnection(conf, pool, AuthUtil.loginClient(conf)); 161 } 162 163 /** 164 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 165 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 166 * created from returned connection share zookeeper connection, meta cache, and connections to 167 * region servers and masters. <br> 168 * The caller is responsible for calling {@link Connection#close()} on the returned connection 169 * instance. Typical usage: 170 * 171 * <pre> 172 * Connection connection = ConnectionFactory.createConnection(conf); 173 * Table table = connection.getTable(TableName.valueOf("table1")); 174 * try { 175 * table.get(...); 176 * ... 177 * } finally { 178 * table.close(); 179 * connection.close(); 180 * } 181 * </pre> 182 * 183 * @param conf configuration 184 * @param user the user the connection is for 185 * @return Connection object for <code>conf</code> 186 */ 187 public static Connection createConnection(Configuration conf, User user) throws IOException { 188 return createConnection(conf, null, user); 189 } 190 191 /** 192 * Create a new Connection instance using the passed <code>conf</code> instance. Connection 193 * encapsulates all housekeeping for a connection to the cluster. All tables and interfaces 194 * created from returned connection share zookeeper connection, meta cache, and connections to 195 * region servers and masters. <br> 196 * The caller is responsible for calling {@link Connection#close()} on the returned connection 197 * instance. Typical usage: 198 * 199 * <pre> 200 * Connection connection = ConnectionFactory.createConnection(conf); 201 * Table table = connection.getTable(TableName.valueOf("table1")); 202 * try { 203 * table.get(...); 204 * ... 205 * } finally { 206 * table.close(); 207 * connection.close(); 208 * } 209 * </pre> 210 * 211 * @param conf configuration 212 * @param user the user the connection is for 213 * @param pool the thread pool to use for batch operations 214 * @return Connection object for <code>conf</code> 215 */ 216 public static Connection createConnection(Configuration conf, ExecutorService pool, 217 final User user) throws IOException { 218 return TraceUtil.trace(() -> { 219 String className = conf.get(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 220 ConnectionImplementation.class.getName()); 221 Class<?> clazz; 222 try { 223 clazz = Class.forName(className); 224 } catch (ClassNotFoundException e) { 225 throw new IOException(e); 226 } 227 try { 228 // Default HCM#HCI is not accessible; make it so before invoking. 229 Constructor<?> constructor = 230 clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class); 231 constructor.setAccessible(true); 232 return user.runAs((PrivilegedExceptionAction< 233 Connection>) () -> (Connection) constructor.newInstance(conf, pool, user)); 234 } catch (Exception e) { 235 throw new IOException(e); 236 } 237 }, () -> TraceUtil.createSpan(ConnectionFactory.class.getSimpleName() + ".createConnection")); 238 } 239 240 /** 241 * Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration. 242 * @see #createAsyncConnection(Configuration) 243 * @return AsyncConnection object wrapped by CompletableFuture 244 */ 245 public static CompletableFuture<AsyncConnection> createAsyncConnection() { 246 return createAsyncConnection(HBaseConfiguration.create()); 247 } 248 249 /** 250 * Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a 251 * User object created by {@link UserProvider}. The given {@code conf} will also be used to 252 * initialize the {@link UserProvider}. 253 * @param conf configuration 254 * @return AsyncConnection object wrapped by CompletableFuture 255 * @see #createAsyncConnection(Configuration, User) 256 * @see UserProvider 257 */ 258 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf) { 259 User user; 260 try { 261 user = AuthUtil.loginClient(conf); 262 } catch (IOException e) { 263 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 264 future.completeExceptionally(e); 265 return future; 266 } 267 return createAsyncConnection(conf, user); 268 } 269 270 /** 271 * Create a new AsyncConnection instance using the passed {@code conf} and {@code user}. 272 * AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and 273 * interfaces created from returned connection share zookeeper connection, meta cache, and 274 * connections to region servers and masters. 275 * <p> 276 * The caller is responsible for calling {@link AsyncConnection#close()} on the returned 277 * connection instance. 278 * <p> 279 * Usually you should only create one AsyncConnection instance in your code and use it everywhere 280 * as it is thread safe. 281 * @param conf configuration 282 * @param user the user the asynchronous connection is for 283 * @return AsyncConnection object wrapped by CompletableFuture 284 */ 285 public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf, 286 final User user) { 287 return TraceUtil.tracedFuture(() -> { 288 CompletableFuture<AsyncConnection> future = new CompletableFuture<>(); 289 ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); 290 addListener(registry.getClusterId(), (clusterId, error) -> { 291 if (error != null) { 292 registry.close(); 293 future.completeExceptionally(error); 294 return; 295 } 296 if (clusterId == null) { 297 registry.close(); 298 future.completeExceptionally(new IOException("clusterid came back null")); 299 return; 300 } 301 Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL, 302 AsyncConnectionImpl.class, AsyncConnection.class); 303 try { 304 future.complete( 305 user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils 306 .newInstance(clazz, conf, registry, clusterId, user))); 307 } catch (Exception e) { 308 registry.close(); 309 future.completeExceptionally(e); 310 } 311 }); 312 return future; 313 }, "ConnectionFactory.createAsyncConnection"); 314 } 315}