001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.token;
020
021import java.io.IOException;
022import java.lang.reflect.UndeclaredThrowableException;
023import java.security.PrivilegedExceptionAction;
024
025import com.google.protobuf.ByteString;
026import com.google.protobuf.ServiceException;
027
028import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
040import org.apache.hadoop.io.Text;
041import org.apache.hadoop.mapred.JobConf;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.security.token.Token;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.zookeeper.KeeperException;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Utility methods for obtaining authentication tokens.
051 */
052@InterfaceAudience.Public
053public class TokenUtil {
054  // This class is referenced indirectly by User out in common; instances are created by reflection
055  private static final Logger LOG = LoggerFactory.getLogger(TokenUtil.class);
056
057  // Set in TestTokenUtil via reflection
058  private static ServiceException injectedException;
059
060  private static void injectFault() throws ServiceException {
061    if (injectedException != null) {
062      throw injectedException;
063    }
064  }
065
066  /**
067   * It was removed in HBase-2.0 but added again as spark code relies on this method to obtain
068   * delegation token
069   * @deprecated Since 2.0.0.
070   */
071  @Deprecated
072  public static Token<AuthenticationTokenIdentifier> obtainToken(Configuration conf)
073      throws IOException {
074    try (Connection connection = ConnectionFactory.createConnection(conf)) {
075      return obtainToken(connection);
076    }
077  }
078
079  /**
080   * Obtain and return an authentication token for the current user.
081   * @param conn The HBase cluster connection
082   * @throws IOException if a remote error or serialization problem occurs.
083   * @return the authentication token instance
084   */
085  public static Token<AuthenticationTokenIdentifier> obtainToken(
086      Connection conn) throws IOException {
087    Table meta = null;
088    try {
089      injectFault();
090
091      meta = conn.getTable(TableName.META_TABLE_NAME);
092      CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
093      AuthenticationProtos.AuthenticationService.BlockingInterface service =
094          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
095      AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
096          AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
097
098      return toToken(response.getToken());
099    } catch (ServiceException se) {
100      throw ProtobufUtil.handleRemoteException(se);
101    } finally {
102      if (meta != null) {
103        meta.close();
104      }
105    }
106  }
107
108
109  /**
110   * Converts a Token instance (with embedded identifier) to the protobuf representation.
111   *
112   * @param token the Token instance to copy
113   * @return the protobuf Token message
114   */
115  public static AuthenticationProtos.Token toToken(Token<AuthenticationTokenIdentifier> token) {
116    AuthenticationProtos.Token.Builder builder = AuthenticationProtos.Token.newBuilder();
117    builder.setIdentifier(ByteString.copyFrom(token.getIdentifier()));
118    builder.setPassword(ByteString.copyFrom(token.getPassword()));
119    if (token.getService() != null) {
120      builder.setService(ByteString.copyFromUtf8(token.getService().toString()));
121    }
122    return builder.build();
123  }
124
125  /**
126   * Obtain and return an authentication token for the current user.
127   * @param conn The HBase cluster connection
128   * @return the authentication token instance
129   */
130  public static Token<AuthenticationTokenIdentifier> obtainToken(
131      final Connection conn, User user) throws IOException, InterruptedException {
132    return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
133      @Override
134      public Token<AuthenticationTokenIdentifier> run() throws Exception {
135        return obtainToken(conn);
136      }
137    });
138  }
139
140
141  private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
142      throws IOException {
143    return token.getService() != null
144        ? token.getService() : new Text("default");
145  }
146
147  /**
148   * Obtain an authentication token for the given user and add it to the
149   * user's credentials.
150   * @param conn The HBase cluster connection
151   * @param user The user for whom to obtain the token
152   * @throws IOException If making a remote call to the authentication service fails
153   * @throws InterruptedException If executing as the given user is interrupted
154   */
155  public static void obtainAndCacheToken(final Connection conn,
156      User user)
157      throws IOException, InterruptedException {
158    try {
159      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
160
161      if (token == null) {
162        throw new IOException("No token returned for user " + user.getName());
163      }
164      if (LOG.isDebugEnabled()) {
165        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
166            user.getName());
167      }
168      user.addToken(token);
169    } catch (IOException ioe) {
170      throw ioe;
171    } catch (InterruptedException ie) {
172      throw ie;
173    } catch (RuntimeException re) {
174      throw re;
175    } catch (Exception e) {
176      throw new UndeclaredThrowableException(e,
177          "Unexpected exception obtaining token for user " + user.getName());
178    }
179  }
180
181  /**
182   * Obtain an authentication token on behalf of the given user and add it to
183   * the credentials for the given map reduce job.
184   * @param conn The HBase cluster connection
185   * @param user The user for whom to obtain the token
186   * @param job The job instance in which the token should be stored
187   * @throws IOException If making a remote call to the authentication service fails
188   * @throws InterruptedException If executing as the given user is interrupted
189   */
190  public static void obtainTokenForJob(final Connection conn,
191      User user, Job job)
192      throws IOException, InterruptedException {
193    try {
194      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
195
196      if (token == null) {
197        throw new IOException("No token returned for user " + user.getName());
198      }
199      Text clusterId = getClusterId(token);
200      if (LOG.isDebugEnabled()) {
201        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
202            user.getName() + " on cluster " + clusterId.toString());
203      }
204      job.getCredentials().addToken(clusterId, token);
205    } catch (IOException ioe) {
206      throw ioe;
207    } catch (InterruptedException ie) {
208      throw ie;
209    } catch (RuntimeException re) {
210      throw re;
211    } catch (Exception e) {
212      throw new UndeclaredThrowableException(e,
213          "Unexpected exception obtaining token for user " + user.getName());
214    }
215  }
216
217  /**
218   * Obtain an authentication token on behalf of the given user and add it to
219   * the credentials for the given map reduce job.
220   * @param conn The HBase cluster connection
221   * @param user The user for whom to obtain the token
222   * @param job The job configuration in which the token should be stored
223   * @throws IOException If making a remote call to the authentication service fails
224   * @throws InterruptedException If executing as the given user is interrupted
225   */
226  public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
227      throws IOException, InterruptedException {
228    try {
229      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
230
231      if (token == null) {
232        throw new IOException("No token returned for user " + user.getName());
233      }
234      Text clusterId = getClusterId(token);
235      if (LOG.isDebugEnabled()) {
236        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
237            user.getName() + " on cluster " + clusterId.toString());
238      }
239      job.getCredentials().addToken(clusterId, token);
240    } catch (IOException ioe) {
241      throw ioe;
242    } catch (InterruptedException ie) {
243      throw ie;
244    } catch (RuntimeException re) {
245      throw re;
246    } catch (Exception e) {
247      throw new UndeclaredThrowableException(e,
248          "Unexpected exception obtaining token for user "+user.getName());
249    }
250  }
251
252  /**
253   * Checks for an authentication token for the given user, obtaining a new token if necessary,
254   * and adds it to the credentials for the given map reduce job.
255   *
256   * @param conn The HBase cluster connection
257   * @param user The user for whom to obtain the token
258   * @param job The job configuration in which the token should be stored
259   * @throws IOException If making a remote call to the authentication service fails
260   * @throws InterruptedException If executing as the given user is interrupted
261   */
262  public static void addTokenForJob(final Connection conn, final JobConf job, User user)
263      throws IOException, InterruptedException {
264
265    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
266    if (token == null) {
267      token = obtainToken(conn, user);
268    }
269    job.getCredentials().addToken(token.getService(), token);
270  }
271
272  /**
273   * Checks for an authentication token for the given user, obtaining a new token if necessary,
274   * and adds it to the credentials for the given map reduce job.
275   *
276   * @param conn The HBase cluster connection
277   * @param user The user for whom to obtain the token
278   * @param job The job instance in which the token should be stored
279   * @throws IOException If making a remote call to the authentication service fails
280   * @throws InterruptedException If executing as the given user is interrupted
281   */
282  public static void addTokenForJob(final Connection conn, User user, Job job)
283      throws IOException, InterruptedException {
284    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
285    if (token == null) {
286      token = obtainToken(conn, user);
287    }
288    job.getCredentials().addToken(token.getService(), token);
289  }
290
291  /**
292   * Checks if an authentication tokens exists for the connected cluster,
293   * obtaining one if needed and adding it to the user's credentials.
294   *
295   * @param conn The HBase cluster connection
296   * @param user The user for whom to obtain the token
297   * @throws IOException If making a remote call to the authentication service fails
298   * @throws InterruptedException If executing as the given user is interrupted
299   * @return true if the token was added, false if it already existed
300   */
301  public static boolean addTokenIfMissing(Connection conn, User user)
302      throws IOException, InterruptedException {
303    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
304    if (token == null) {
305      token = obtainToken(conn, user);
306      user.getUGI().addToken(token.getService(), token);
307      return true;
308    }
309    return false;
310  }
311
312  /**
313   * Get the authentication token of the user for the cluster specified in the configuration
314   * @return null if the user does not have the token, otherwise the auth token for the cluster.
315   */
316  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
317      throws IOException, InterruptedException {
318    ZKWatcher zkw = new ZKWatcher(conf, "TokenUtil-getAuthToken", null);
319    try {
320      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
321      if (clusterId == null) {
322        throw new IOException("Failed to get cluster ID");
323      }
324      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
325    } catch (KeeperException e) {
326      throw new IOException(e);
327    } finally {
328      zkw.close();
329    }
330  }
331
332  /**
333   * Converts a protobuf Token message back into a Token instance.
334   *
335   * @param proto the protobuf Token message
336   * @return the Token instance
337   */
338  public static Token<AuthenticationTokenIdentifier> toToken(AuthenticationProtos.Token proto) {
339    return new Token<>(
340        proto.hasIdentifier() ? proto.getIdentifier().toByteArray() : null,
341        proto.hasPassword() ? proto.getPassword().toByteArray() : null,
342        AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE,
343        proto.hasService() ? new Text(proto.getService().toStringUtf8()) : null);
344  }
345}