001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.security.token; 020 021import java.io.IOException; 022import java.lang.reflect.UndeclaredThrowableException; 023import java.security.PrivilegedExceptionAction; 024 025import com.google.protobuf.ByteString; 026import com.google.protobuf.ServiceException; 027 028import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; 037import org.apache.hadoop.hbase.security.User; 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 040import org.apache.hadoop.io.Text; 041import org.apache.hadoop.mapred.JobConf; 042import org.apache.hadoop.mapreduce.Job; 043import org.apache.hadoop.security.token.Token; 044import org.apache.yetus.audience.InterfaceAudience; 045import org.apache.zookeeper.KeeperException; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Utility methods for obtaining authentication tokens. 051 */ 052@InterfaceAudience.Public 053public class TokenUtil { 054 // This class is referenced indirectly by User out in common; instances are created by reflection 055 private static final Logger LOG = LoggerFactory.getLogger(TokenUtil.class); 056 057 // Set in TestTokenUtil via reflection 058 private static ServiceException injectedException; 059 060 private static void injectFault() throws ServiceException { 061 if (injectedException != null) { 062 throw injectedException; 063 } 064 } 065 066 /** 067 * It was removed in HBase-2.0 but added again as spark code relies on this method to obtain 068 * delegation token 069 * @deprecated Since 2.0.0. 070 */ 071 @Deprecated 072 public static Token<AuthenticationTokenIdentifier> obtainToken(Configuration conf) 073 throws IOException { 074 try (Connection connection = ConnectionFactory.createConnection(conf)) { 075 return obtainToken(connection); 076 } 077 } 078 079 /** 080 * Obtain and return an authentication token for the current user. 081 * @param conn The HBase cluster connection 082 * @throws IOException if a remote error or serialization problem occurs. 083 * @return the authentication token instance 084 */ 085 public static Token<AuthenticationTokenIdentifier> obtainToken( 086 Connection conn) throws IOException { 087 Table meta = null; 088 try { 089 injectFault(); 090 091 meta = conn.getTable(TableName.META_TABLE_NAME); 092 CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); 093 AuthenticationProtos.AuthenticationService.BlockingInterface service = 094 AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); 095 AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null, 096 AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); 097 098 return toToken(response.getToken()); 099 } catch (ServiceException se) { 100 throw ProtobufUtil.handleRemoteException(se); 101 } finally { 102 if (meta != null) { 103 meta.close(); 104 } 105 } 106 } 107 108 109 /** 110 * Converts a Token instance (with embedded identifier) to the protobuf representation. 111 * 112 * @param token the Token instance to copy 113 * @return the protobuf Token message 114 */ 115 public static AuthenticationProtos.Token toToken(Token<AuthenticationTokenIdentifier> token) { 116 AuthenticationProtos.Token.Builder builder = AuthenticationProtos.Token.newBuilder(); 117 builder.setIdentifier(ByteString.copyFrom(token.getIdentifier())); 118 builder.setPassword(ByteString.copyFrom(token.getPassword())); 119 if (token.getService() != null) { 120 builder.setService(ByteString.copyFromUtf8(token.getService().toString())); 121 } 122 return builder.build(); 123 } 124 125 /** 126 * Obtain and return an authentication token for the current user. 127 * @param conn The HBase cluster connection 128 * @return the authentication token instance 129 */ 130 public static Token<AuthenticationTokenIdentifier> obtainToken( 131 final Connection conn, User user) throws IOException, InterruptedException { 132 return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() { 133 @Override 134 public Token<AuthenticationTokenIdentifier> run() throws Exception { 135 return obtainToken(conn); 136 } 137 }); 138 } 139 140 141 private static Text getClusterId(Token<AuthenticationTokenIdentifier> token) 142 throws IOException { 143 return token.getService() != null 144 ? token.getService() : new Text("default"); 145 } 146 147 /** 148 * Obtain an authentication token for the given user and add it to the 149 * user's credentials. 150 * @param conn The HBase cluster connection 151 * @param user The user for whom to obtain the token 152 * @throws IOException If making a remote call to the authentication service fails 153 * @throws InterruptedException If executing as the given user is interrupted 154 */ 155 public static void obtainAndCacheToken(final Connection conn, 156 User user) 157 throws IOException, InterruptedException { 158 try { 159 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 160 161 if (token == null) { 162 throw new IOException("No token returned for user " + user.getName()); 163 } 164 if (LOG.isDebugEnabled()) { 165 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 166 user.getName()); 167 } 168 user.addToken(token); 169 } catch (IOException ioe) { 170 throw ioe; 171 } catch (InterruptedException ie) { 172 throw ie; 173 } catch (RuntimeException re) { 174 throw re; 175 } catch (Exception e) { 176 throw new UndeclaredThrowableException(e, 177 "Unexpected exception obtaining token for user " + user.getName()); 178 } 179 } 180 181 /** 182 * Obtain an authentication token on behalf of the given user and add it to 183 * the credentials for the given map reduce job. 184 * @param conn The HBase cluster connection 185 * @param user The user for whom to obtain the token 186 * @param job The job instance in which the token should be stored 187 * @throws IOException If making a remote call to the authentication service fails 188 * @throws InterruptedException If executing as the given user is interrupted 189 */ 190 public static void obtainTokenForJob(final Connection conn, 191 User user, Job job) 192 throws IOException, InterruptedException { 193 try { 194 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 195 196 if (token == null) { 197 throw new IOException("No token returned for user " + user.getName()); 198 } 199 Text clusterId = getClusterId(token); 200 if (LOG.isDebugEnabled()) { 201 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 202 user.getName() + " on cluster " + clusterId.toString()); 203 } 204 job.getCredentials().addToken(clusterId, token); 205 } catch (IOException ioe) { 206 throw ioe; 207 } catch (InterruptedException ie) { 208 throw ie; 209 } catch (RuntimeException re) { 210 throw re; 211 } catch (Exception e) { 212 throw new UndeclaredThrowableException(e, 213 "Unexpected exception obtaining token for user " + user.getName()); 214 } 215 } 216 217 /** 218 * Obtain an authentication token on behalf of the given user and add it to 219 * the credentials for the given map reduce job. 220 * @param conn The HBase cluster connection 221 * @param user The user for whom to obtain the token 222 * @param job The job configuration in which the token should be stored 223 * @throws IOException If making a remote call to the authentication service fails 224 * @throws InterruptedException If executing as the given user is interrupted 225 */ 226 public static void obtainTokenForJob(final Connection conn, final JobConf job, User user) 227 throws IOException, InterruptedException { 228 try { 229 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 230 231 if (token == null) { 232 throw new IOException("No token returned for user " + user.getName()); 233 } 234 Text clusterId = getClusterId(token); 235 if (LOG.isDebugEnabled()) { 236 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 237 user.getName() + " on cluster " + clusterId.toString()); 238 } 239 job.getCredentials().addToken(clusterId, token); 240 } catch (IOException ioe) { 241 throw ioe; 242 } catch (InterruptedException ie) { 243 throw ie; 244 } catch (RuntimeException re) { 245 throw re; 246 } catch (Exception e) { 247 throw new UndeclaredThrowableException(e, 248 "Unexpected exception obtaining token for user "+user.getName()); 249 } 250 } 251 252 /** 253 * Checks for an authentication token for the given user, obtaining a new token if necessary, 254 * and adds it to the credentials for the given map reduce job. 255 * 256 * @param conn The HBase cluster connection 257 * @param user The user for whom to obtain the token 258 * @param job The job configuration in which the token should be stored 259 * @throws IOException If making a remote call to the authentication service fails 260 * @throws InterruptedException If executing as the given user is interrupted 261 */ 262 public static void addTokenForJob(final Connection conn, final JobConf job, User user) 263 throws IOException, InterruptedException { 264 265 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 266 if (token == null) { 267 token = obtainToken(conn, user); 268 } 269 job.getCredentials().addToken(token.getService(), token); 270 } 271 272 /** 273 * Checks for an authentication token for the given user, obtaining a new token if necessary, 274 * and adds it to the credentials for the given map reduce job. 275 * 276 * @param conn The HBase cluster connection 277 * @param user The user for whom to obtain the token 278 * @param job The job instance in which the token should be stored 279 * @throws IOException If making a remote call to the authentication service fails 280 * @throws InterruptedException If executing as the given user is interrupted 281 */ 282 public static void addTokenForJob(final Connection conn, User user, Job job) 283 throws IOException, InterruptedException { 284 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 285 if (token == null) { 286 token = obtainToken(conn, user); 287 } 288 job.getCredentials().addToken(token.getService(), token); 289 } 290 291 /** 292 * Checks if an authentication tokens exists for the connected cluster, 293 * obtaining one if needed and adding it to the user's credentials. 294 * 295 * @param conn The HBase cluster connection 296 * @param user The user for whom to obtain the token 297 * @throws IOException If making a remote call to the authentication service fails 298 * @throws InterruptedException If executing as the given user is interrupted 299 * @return true if the token was added, false if it already existed 300 */ 301 public static boolean addTokenIfMissing(Connection conn, User user) 302 throws IOException, InterruptedException { 303 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 304 if (token == null) { 305 token = obtainToken(conn, user); 306 user.getUGI().addToken(token.getService(), token); 307 return true; 308 } 309 return false; 310 } 311 312 /** 313 * Get the authentication token of the user for the cluster specified in the configuration 314 * @return null if the user does not have the token, otherwise the auth token for the cluster. 315 */ 316 private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user) 317 throws IOException, InterruptedException { 318 ZKWatcher zkw = new ZKWatcher(conf, "TokenUtil-getAuthToken", null); 319 try { 320 String clusterId = ZKClusterId.readClusterIdZNode(zkw); 321 if (clusterId == null) { 322 throw new IOException("Failed to get cluster ID"); 323 } 324 return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens()); 325 } catch (KeeperException e) { 326 throw new IOException(e); 327 } finally { 328 zkw.close(); 329 } 330 } 331 332 /** 333 * Converts a protobuf Token message back into a Token instance. 334 * 335 * @param proto the protobuf Token message 336 * @return the Token instance 337 */ 338 public static Token<AuthenticationTokenIdentifier> toToken(AuthenticationProtos.Token proto) { 339 return new Token<>( 340 proto.hasIdentifier() ? proto.getIdentifier().toByteArray() : null, 341 proto.hasPassword() ? proto.getPassword().toByteArray() : null, 342 AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE, 343 proto.hasService() ? new Text(proto.getService().toStringUtf8()) : null); 344 } 345}