001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.security.token; 020 021import java.io.IOException; 022import java.lang.reflect.UndeclaredThrowableException; 023import java.security.PrivilegedExceptionAction; 024 025import com.google.protobuf.ByteString; 026import com.google.protobuf.ServiceException; 027 028import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 036import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; 037import org.apache.hadoop.hbase.security.User; 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 040import org.apache.hadoop.io.Text; 041import org.apache.hadoop.mapred.JobConf; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.security.token.Token; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.apache.zookeeper.KeeperException; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Utility methods for obtaining authentication tokens. 051 */ 052@InterfaceAudience.Public 053public class TokenUtil { 054 // This class is referenced indirectly by User out in common; instances are created by reflection 055 private static final Logger LOG = LoggerFactory.getLogger(TokenUtil.class); 056 057 // Set in TestTokenUtil via reflection 058 private static ServiceException injectedException; 059 060 private static void injectFault() throws ServiceException { 061 if (injectedException != null) { 062 throw injectedException; 063 } 064 } 065 066 /** 067 * Obtain and return an authentication token for the current user. 068 * It was removed in HBase-2.0 but added again as spark code relies on this method to obtain 069 * delegation token 070 * @deprecated Since 2.0.0. 071 */ 072 @Deprecated 073 public static Token<AuthenticationTokenIdentifier> obtainToken(Configuration conf) 074 throws IOException { 075 try (Connection connection = ConnectionFactory.createConnection(conf)) { 076 return obtainToken(connection); 077 } 078 } 079 080 /** 081 * See {@link ClientTokenUtil#obtainToken(org.apache.hadoop.hbase.client.Connection)}. 082 * @deprecated External users should not use this method. Please post on 083 * the HBase dev mailing list if you need this method. Internal 084 * HBase code should use {@link ClientTokenUtil} instead. 085 */ 086 @Deprecated 087 public static Token<AuthenticationTokenIdentifier> obtainToken( 088 Connection conn) throws IOException { 089 Table meta = null; 090 try { 091 injectFault(); 092 093 meta = conn.getTable(TableName.META_TABLE_NAME); 094 CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); 095 AuthenticationProtos.AuthenticationService.BlockingInterface service = 096 AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); 097 AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null, 098 AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); 099 100 return toToken(response.getToken()); 101 } catch (ServiceException se) { 102 throw ProtobufUtil.handleRemoteException(se); 103 } finally { 104 if (meta != null) { 105 meta.close(); 106 } 107 } 108 } 109 110 111 /** 112 * Converts a Token instance (with embedded identifier) to the protobuf representation. 113 * 114 * @param token the Token instance to copy 115 * @return the protobuf Token message 116 */ 117 public static AuthenticationProtos.Token toToken(Token<AuthenticationTokenIdentifier> token) { 118 AuthenticationProtos.Token.Builder builder = AuthenticationProtos.Token.newBuilder(); 119 builder.setIdentifier(ByteString.copyFrom(token.getIdentifier())); 120 builder.setPassword(ByteString.copyFrom(token.getPassword())); 121 if (token.getService() != null) { 122 builder.setService(ByteString.copyFromUtf8(token.getService().toString())); 123 } 124 return builder.build(); 125 } 126 127 /** 128 * Obtain and return an authentication token for the current user. 129 * @param conn The HBase cluster connection 130 * @return the authentication token instance 131 */ 132 public static Token<AuthenticationTokenIdentifier> obtainToken( 133 final Connection conn, User user) throws IOException, InterruptedException { 134 return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() { 135 @Override 136 public Token<AuthenticationTokenIdentifier> run() throws Exception { 137 return obtainToken(conn); 138 } 139 }); 140 } 141 142 143 private static Text getClusterId(Token<AuthenticationTokenIdentifier> token) 144 throws IOException { 145 return token.getService() != null 146 ? token.getService() : new Text("default"); 147 } 148 149 /** 150 * Obtain an authentication token for the given user and add it to the 151 * user's credentials. 152 * @param conn The HBase cluster connection 153 * @param user The user for whom to obtain the token 154 * @throws IOException If making a remote call to the authentication service fails 155 * @throws InterruptedException If executing as the given user is interrupted 156 */ 157 public static void obtainAndCacheToken(final Connection conn, 158 User user) 159 throws IOException, InterruptedException { 160 try { 161 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 162 163 if (token == null) { 164 throw new IOException("No token returned for user " + user.getName()); 165 } 166 if (LOG.isDebugEnabled()) { 167 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 168 user.getName()); 169 } 170 user.addToken(token); 171 } catch (IOException ioe) { 172 throw ioe; 173 } catch (InterruptedException ie) { 174 throw ie; 175 } catch (RuntimeException re) { 176 throw re; 177 } catch (Exception e) { 178 throw new UndeclaredThrowableException(e, 179 "Unexpected exception obtaining token for user " + user.getName()); 180 } 181 } 182 183 /** 184 * Obtain an authentication token on behalf of the given user and add it to 185 * the credentials for the given map reduce job. 186 * @param conn The HBase cluster connection 187 * @param user The user for whom to obtain the token 188 * @param job The job instance in which the token should be stored 189 * @throws IOException If making a remote call to the authentication service fails 190 * @throws InterruptedException If executing as the given user is interrupted 191 */ 192 public static void obtainTokenForJob(final Connection conn, 193 User user, Job job) 194 throws IOException, InterruptedException { 195 try { 196 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 197 198 if (token == null) { 199 throw new IOException("No token returned for user " + user.getName()); 200 } 201 Text clusterId = getClusterId(token); 202 if (LOG.isDebugEnabled()) { 203 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 204 user.getName() + " on cluster " + clusterId.toString()); 205 } 206 job.getCredentials().addToken(clusterId, token); 207 } catch (IOException ioe) { 208 throw ioe; 209 } catch (InterruptedException ie) { 210 throw ie; 211 } catch (RuntimeException re) { 212 throw re; 213 } catch (Exception e) { 214 throw new UndeclaredThrowableException(e, 215 "Unexpected exception obtaining token for user " + user.getName()); 216 } 217 } 218 219 /** 220 * Obtain an authentication token on behalf of the given user and add it to 221 * the credentials for the given map reduce job. 222 * @param conn The HBase cluster connection 223 * @param user The user for whom to obtain the token 224 * @param job The job configuration in which the token should be stored 225 * @throws IOException If making a remote call to the authentication service fails 226 * @throws InterruptedException If executing as the given user is interrupted 227 */ 228 public static void obtainTokenForJob(final Connection conn, final JobConf job, User user) 229 throws IOException, InterruptedException { 230 try { 231 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 232 233 if (token == null) { 234 throw new IOException("No token returned for user " + user.getName()); 235 } 236 Text clusterId = getClusterId(token); 237 if (LOG.isDebugEnabled()) { 238 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 239 user.getName() + " on cluster " + clusterId.toString()); 240 } 241 job.getCredentials().addToken(clusterId, token); 242 } catch (IOException ioe) { 243 throw ioe; 244 } catch (InterruptedException ie) { 245 throw ie; 246 } catch (RuntimeException re) { 247 throw re; 248 } catch (Exception e) { 249 throw new UndeclaredThrowableException(e, 250 "Unexpected exception obtaining token for user "+user.getName()); 251 } 252 } 253 254 /** 255 * Checks for an authentication token for the given user, obtaining a new token if necessary, 256 * and adds it to the credentials for the given map reduce job. 257 * 258 * @param conn The HBase cluster connection 259 * @param user The user for whom to obtain the token 260 * @param job The job configuration in which the token should be stored 261 * @throws IOException If making a remote call to the authentication service fails 262 * @throws InterruptedException If executing as the given user is interrupted 263 */ 264 public static void addTokenForJob(final Connection conn, final JobConf job, User user) 265 throws IOException, InterruptedException { 266 267 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 268 if (token == null) { 269 token = obtainToken(conn, user); 270 } 271 job.getCredentials().addToken(token.getService(), token); 272 } 273 274 /** 275 * Checks for an authentication token for the given user, obtaining a new token if necessary, 276 * and adds it to the credentials for the given map reduce job. 277 * 278 * @param conn The HBase cluster connection 279 * @param user The user for whom to obtain the token 280 * @param job The job instance in which the token should be stored 281 * @throws IOException If making a remote call to the authentication service fails 282 * @throws InterruptedException If executing as the given user is interrupted 283 */ 284 public static void addTokenForJob(final Connection conn, User user, Job job) 285 throws IOException, InterruptedException { 286 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 287 if (token == null) { 288 token = obtainToken(conn, user); 289 } 290 job.getCredentials().addToken(token.getService(), token); 291 } 292 293 /** 294 * Checks if an authentication tokens exists for the connected cluster, 295 * obtaining one if needed and adding it to the user's credentials. 296 * 297 * @param conn The HBase cluster connection 298 * @param user The user for whom to obtain the token 299 * @throws IOException If making a remote call to the authentication service fails 300 * @throws InterruptedException If executing as the given user is interrupted 301 * @return true if the token was added, false if it already existed 302 */ 303 public static boolean addTokenIfMissing(Connection conn, User user) 304 throws IOException, InterruptedException { 305 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 306 if (token == null) { 307 token = obtainToken(conn, user); 308 user.getUGI().addToken(token.getService(), token); 309 return true; 310 } 311 return false; 312 } 313 314 /** 315 * Get the authentication token of the user for the cluster specified in the configuration 316 * @return null if the user does not have the token, otherwise the auth token for the cluster. 317 */ 318 private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user) 319 throws IOException, InterruptedException { 320 ZKWatcher zkw = new ZKWatcher(conf, "TokenUtil-getAuthToken", null); 321 try { 322 String clusterId = ZKClusterId.readClusterIdZNode(zkw); 323 if (clusterId == null) { 324 throw new IOException("Failed to get cluster ID"); 325 } 326 return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens()); 327 } catch (KeeperException e) { 328 throw new IOException(e); 329 } finally { 330 zkw.close(); 331 } 332 } 333 334 /** 335 * Converts a protobuf Token message back into a Token instance. 336 * 337 * @param proto the protobuf Token message 338 * @return the Token instance 339 */ 340 public static Token<AuthenticationTokenIdentifier> toToken(AuthenticationProtos.Token proto) { 341 return new Token<>( 342 proto.hasIdentifier() ? proto.getIdentifier().toByteArray() : null, 343 proto.hasPassword() ? proto.getPassword().toByteArray() : null, 344 AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE, 345 proto.hasService() ? new Text(proto.getService().toStringUtf8()) : null); 346 } 347}