001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.security.token; 020 021import java.io.IOException; 022import java.lang.reflect.UndeclaredThrowableException; 023import java.security.PrivilegedExceptionAction; 024 025import com.google.protobuf.ByteString; 026import com.google.protobuf.ServiceException; 027 028import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.TableName; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 035import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; 036import org.apache.hadoop.hbase.security.User; 037import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 038import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 039import org.apache.hadoop.io.Text; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapreduce.Job; 042import org.apache.hadoop.security.token.Token; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.apache.zookeeper.KeeperException; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * Utility methods for obtaining authentication tokens. 050 */ 051@InterfaceAudience.Public 052public class TokenUtil { 053 // This class is referenced indirectly by User out in common; instances are created by reflection 054 private static final Logger LOG = LoggerFactory.getLogger(TokenUtil.class); 055 056 // Set in TestTokenUtil via reflection 057 private static ServiceException injectedException; 058 059 private static void injectFault() throws ServiceException { 060 if (injectedException != null) { 061 throw injectedException; 062 } 063 } 064 065 /** 066 * Obtain and return an authentication token for the current user. 067 * @param conn The HBase cluster connection 068 * @throws IOException if a remote error or serialization problem occurs. 069 * @return the authentication token instance 070 */ 071 public static Token<AuthenticationTokenIdentifier> obtainToken( 072 Connection conn) throws IOException { 073 Table meta = null; 074 try { 075 injectFault(); 076 077 meta = conn.getTable(TableName.META_TABLE_NAME); 078 CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW); 079 AuthenticationProtos.AuthenticationService.BlockingInterface service = 080 AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); 081 AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null, 082 AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance()); 083 084 return toToken(response.getToken()); 085 } catch (ServiceException se) { 086 throw ProtobufUtil.handleRemoteException(se); 087 } finally { 088 if (meta != null) { 089 meta.close(); 090 } 091 } 092 } 093 094 095 /** 096 * Converts a Token instance (with embedded identifier) to the protobuf representation. 097 * 098 * @param token the Token instance to copy 099 * @return the protobuf Token message 100 */ 101 public static AuthenticationProtos.Token toToken(Token<AuthenticationTokenIdentifier> token) { 102 AuthenticationProtos.Token.Builder builder = AuthenticationProtos.Token.newBuilder(); 103 builder.setIdentifier(ByteString.copyFrom(token.getIdentifier())); 104 builder.setPassword(ByteString.copyFrom(token.getPassword())); 105 if (token.getService() != null) { 106 builder.setService(ByteString.copyFromUtf8(token.getService().toString())); 107 } 108 return builder.build(); 109 } 110 111 /** 112 * Obtain and return an authentication token for the current user. 113 * @param conn The HBase cluster connection 114 * @return the authentication token instance 115 */ 116 public static Token<AuthenticationTokenIdentifier> obtainToken( 117 final Connection conn, User user) throws IOException, InterruptedException { 118 return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() { 119 @Override 120 public Token<AuthenticationTokenIdentifier> run() throws Exception { 121 return obtainToken(conn); 122 } 123 }); 124 } 125 126 127 private static Text getClusterId(Token<AuthenticationTokenIdentifier> token) 128 throws IOException { 129 return token.getService() != null 130 ? token.getService() : new Text("default"); 131 } 132 133 /** 134 * Obtain an authentication token for the given user and add it to the 135 * user's credentials. 136 * @param conn The HBase cluster connection 137 * @param user The user for whom to obtain the token 138 * @throws IOException If making a remote call to the authentication service fails 139 * @throws InterruptedException If executing as the given user is interrupted 140 */ 141 public static void obtainAndCacheToken(final Connection conn, 142 User user) 143 throws IOException, InterruptedException { 144 try { 145 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 146 147 if (token == null) { 148 throw new IOException("No token returned for user " + user.getName()); 149 } 150 if (LOG.isDebugEnabled()) { 151 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 152 user.getName()); 153 } 154 user.addToken(token); 155 } catch (IOException ioe) { 156 throw ioe; 157 } catch (InterruptedException ie) { 158 throw ie; 159 } catch (RuntimeException re) { 160 throw re; 161 } catch (Exception e) { 162 throw new UndeclaredThrowableException(e, 163 "Unexpected exception obtaining token for user " + user.getName()); 164 } 165 } 166 167 /** 168 * Obtain an authentication token on behalf of the given user and add it to 169 * the credentials for the given map reduce job. 170 * @param conn The HBase cluster connection 171 * @param user The user for whom to obtain the token 172 * @param job The job instance in which the token should be stored 173 * @throws IOException If making a remote call to the authentication service fails 174 * @throws InterruptedException If executing as the given user is interrupted 175 */ 176 public static void obtainTokenForJob(final Connection conn, 177 User user, Job job) 178 throws IOException, InterruptedException { 179 try { 180 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 181 182 if (token == null) { 183 throw new IOException("No token returned for user " + user.getName()); 184 } 185 Text clusterId = getClusterId(token); 186 if (LOG.isDebugEnabled()) { 187 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 188 user.getName() + " on cluster " + clusterId.toString()); 189 } 190 job.getCredentials().addToken(clusterId, token); 191 } catch (IOException ioe) { 192 throw ioe; 193 } catch (InterruptedException ie) { 194 throw ie; 195 } catch (RuntimeException re) { 196 throw re; 197 } catch (Exception e) { 198 throw new UndeclaredThrowableException(e, 199 "Unexpected exception obtaining token for user " + user.getName()); 200 } 201 } 202 203 /** 204 * Obtain an authentication token on behalf of the given user and add it to 205 * the credentials for the given map reduce job. 206 * @param conn The HBase cluster connection 207 * @param user The user for whom to obtain the token 208 * @param job The job configuration in which the token should be stored 209 * @throws IOException If making a remote call to the authentication service fails 210 * @throws InterruptedException If executing as the given user is interrupted 211 */ 212 public static void obtainTokenForJob(final Connection conn, final JobConf job, User user) 213 throws IOException, InterruptedException { 214 try { 215 Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user); 216 217 if (token == null) { 218 throw new IOException("No token returned for user " + user.getName()); 219 } 220 Text clusterId = getClusterId(token); 221 if (LOG.isDebugEnabled()) { 222 LOG.debug("Obtained token " + token.getKind().toString() + " for user " + 223 user.getName() + " on cluster " + clusterId.toString()); 224 } 225 job.getCredentials().addToken(clusterId, token); 226 } catch (IOException ioe) { 227 throw ioe; 228 } catch (InterruptedException ie) { 229 throw ie; 230 } catch (RuntimeException re) { 231 throw re; 232 } catch (Exception e) { 233 throw new UndeclaredThrowableException(e, 234 "Unexpected exception obtaining token for user "+user.getName()); 235 } 236 } 237 238 /** 239 * Checks for an authentication token for the given user, obtaining a new token if necessary, 240 * and adds it to the credentials for the given map reduce job. 241 * 242 * @param conn The HBase cluster connection 243 * @param user The user for whom to obtain the token 244 * @param job The job configuration in which the token should be stored 245 * @throws IOException If making a remote call to the authentication service fails 246 * @throws InterruptedException If executing as the given user is interrupted 247 */ 248 public static void addTokenForJob(final Connection conn, final JobConf job, User user) 249 throws IOException, InterruptedException { 250 251 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 252 if (token == null) { 253 token = obtainToken(conn, user); 254 } 255 job.getCredentials().addToken(token.getService(), token); 256 } 257 258 /** 259 * Checks for an authentication token for the given user, obtaining a new token if necessary, 260 * and adds it to the credentials for the given map reduce job. 261 * 262 * @param conn The HBase cluster connection 263 * @param user The user for whom to obtain the token 264 * @param job The job instance in which the token should be stored 265 * @throws IOException If making a remote call to the authentication service fails 266 * @throws InterruptedException If executing as the given user is interrupted 267 */ 268 public static void addTokenForJob(final Connection conn, User user, Job job) 269 throws IOException, InterruptedException { 270 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 271 if (token == null) { 272 token = obtainToken(conn, user); 273 } 274 job.getCredentials().addToken(token.getService(), token); 275 } 276 277 /** 278 * Checks if an authentication tokens exists for the connected cluster, 279 * obtaining one if needed and adding it to the user's credentials. 280 * 281 * @param conn The HBase cluster connection 282 * @param user The user for whom to obtain the token 283 * @throws IOException If making a remote call to the authentication service fails 284 * @throws InterruptedException If executing as the given user is interrupted 285 * @return true if the token was added, false if it already existed 286 */ 287 public static boolean addTokenIfMissing(Connection conn, User user) 288 throws IOException, InterruptedException { 289 Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user); 290 if (token == null) { 291 token = obtainToken(conn, user); 292 user.getUGI().addToken(token.getService(), token); 293 return true; 294 } 295 return false; 296 } 297 298 /** 299 * Get the authentication token of the user for the cluster specified in the configuration 300 * @return null if the user does not have the token, otherwise the auth token for the cluster. 301 */ 302 private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user) 303 throws IOException, InterruptedException { 304 ZKWatcher zkw = new ZKWatcher(conf, "TokenUtil-getAuthToken", null); 305 try { 306 String clusterId = ZKClusterId.readClusterIdZNode(zkw); 307 if (clusterId == null) { 308 throw new IOException("Failed to get cluster ID"); 309 } 310 return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens()); 311 } catch (KeeperException e) { 312 throw new IOException(e); 313 } finally { 314 zkw.close(); 315 } 316 } 317 318 /** 319 * Converts a protobuf Token message back into a Token instance. 320 * 321 * @param proto the protobuf Token message 322 * @return the Token instance 323 */ 324 public static Token<AuthenticationTokenIdentifier> toToken(AuthenticationProtos.Token proto) { 325 return new Token<>( 326 proto.hasIdentifier() ? proto.getIdentifier().toByteArray() : null, 327 proto.hasPassword() ? proto.getPassword().toByteArray() : null, 328 AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE, 329 proto.hasService() ? new Text(proto.getService().toStringUtf8()) : null); 330 } 331}