001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.token;
020
021import java.io.IOException;
022import java.lang.reflect.UndeclaredThrowableException;
023import java.security.PrivilegedExceptionAction;
024
025import com.google.protobuf.ByteString;
026import com.google.protobuf.ServiceException;
027
028import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
036import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
040import org.apache.hadoop.io.Text;
041import org.apache.hadoop.mapred.JobConf;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.security.token.Token;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.apache.zookeeper.KeeperException;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Utility methods for obtaining authentication tokens.
051 */
052@InterfaceAudience.Public
053public class TokenUtil {
054  // This class is referenced indirectly by User out in common; instances are created by reflection
055  private static final Logger LOG = LoggerFactory.getLogger(TokenUtil.class);
056
057  // Set in TestTokenUtil via reflection
058  private static ServiceException injectedException;
059
060  private static void injectFault() throws ServiceException {
061    if (injectedException != null) {
062      throw injectedException;
063    }
064  }
065
066  /**
067   * Obtain and return an authentication token for the current user.
068   * It was removed in HBase-2.0 but added again as spark code relies on this method to obtain
069   * delegation token
070   * @deprecated Since 2.0.0.
071   */
072  @Deprecated
073  public static Token<AuthenticationTokenIdentifier> obtainToken(Configuration conf)
074      throws IOException {
075    try (Connection connection = ConnectionFactory.createConnection(conf)) {
076      return obtainToken(connection);
077    }
078  }
079
080  /**
081   * See {@link ClientTokenUtil#obtainToken(org.apache.hadoop.hbase.client.Connection)}.
082   * @deprecated External users should not use this method. Please post on
083   *   the HBase dev mailing list if you need this method. Internal
084   *   HBase code should use {@link ClientTokenUtil} instead.
085   */
086  @Deprecated
087  public static Token<AuthenticationTokenIdentifier> obtainToken(
088      Connection conn) throws IOException {
089    Table meta = null;
090    try {
091      injectFault();
092
093      meta = conn.getTable(TableName.META_TABLE_NAME);
094      CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
095      AuthenticationProtos.AuthenticationService.BlockingInterface service =
096          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
097      AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
098          AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
099
100      return toToken(response.getToken());
101    } catch (ServiceException se) {
102      throw ProtobufUtil.handleRemoteException(se);
103    } finally {
104      if (meta != null) {
105        meta.close();
106      }
107    }
108  }
109
110
111  /**
112   * Converts a Token instance (with embedded identifier) to the protobuf representation.
113   *
114   * @param token the Token instance to copy
115   * @return the protobuf Token message
116   */
117  public static AuthenticationProtos.Token toToken(Token<AuthenticationTokenIdentifier> token) {
118    AuthenticationProtos.Token.Builder builder = AuthenticationProtos.Token.newBuilder();
119    builder.setIdentifier(ByteString.copyFrom(token.getIdentifier()));
120    builder.setPassword(ByteString.copyFrom(token.getPassword()));
121    if (token.getService() != null) {
122      builder.setService(ByteString.copyFromUtf8(token.getService().toString()));
123    }
124    return builder.build();
125  }
126
127  /**
128   * Obtain and return an authentication token for the current user.
129   * @param conn The HBase cluster connection
130   * @return the authentication token instance
131   */
132  public static Token<AuthenticationTokenIdentifier> obtainToken(
133      final Connection conn, User user) throws IOException, InterruptedException {
134    return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
135      @Override
136      public Token<AuthenticationTokenIdentifier> run() throws Exception {
137        return obtainToken(conn);
138      }
139    });
140  }
141
142
143  private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
144      throws IOException {
145    return token.getService() != null
146        ? token.getService() : new Text("default");
147  }
148
149  /**
150   * Obtain an authentication token for the given user and add it to the
151   * user's credentials.
152   * @param conn The HBase cluster connection
153   * @param user The user for whom to obtain the token
154   * @throws IOException If making a remote call to the authentication service fails
155   * @throws InterruptedException If executing as the given user is interrupted
156   */
157  public static void obtainAndCacheToken(final Connection conn,
158      User user)
159      throws IOException, InterruptedException {
160    try {
161      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
162
163      if (token == null) {
164        throw new IOException("No token returned for user " + user.getName());
165      }
166      if (LOG.isDebugEnabled()) {
167        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
168            user.getName());
169      }
170      user.addToken(token);
171    } catch (IOException ioe) {
172      throw ioe;
173    } catch (InterruptedException ie) {
174      throw ie;
175    } catch (RuntimeException re) {
176      throw re;
177    } catch (Exception e) {
178      throw new UndeclaredThrowableException(e,
179          "Unexpected exception obtaining token for user " + user.getName());
180    }
181  }
182
183  /**
184   * Obtain an authentication token on behalf of the given user and add it to
185   * the credentials for the given map reduce job.
186   * @param conn The HBase cluster connection
187   * @param user The user for whom to obtain the token
188   * @param job The job instance in which the token should be stored
189   * @throws IOException If making a remote call to the authentication service fails
190   * @throws InterruptedException If executing as the given user is interrupted
191   */
192  public static void obtainTokenForJob(final Connection conn,
193      User user, Job job)
194      throws IOException, InterruptedException {
195    try {
196      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
197
198      if (token == null) {
199        throw new IOException("No token returned for user " + user.getName());
200      }
201      Text clusterId = getClusterId(token);
202      if (LOG.isDebugEnabled()) {
203        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
204            user.getName() + " on cluster " + clusterId.toString());
205      }
206      job.getCredentials().addToken(clusterId, token);
207    } catch (IOException ioe) {
208      throw ioe;
209    } catch (InterruptedException ie) {
210      throw ie;
211    } catch (RuntimeException re) {
212      throw re;
213    } catch (Exception e) {
214      throw new UndeclaredThrowableException(e,
215          "Unexpected exception obtaining token for user " + user.getName());
216    }
217  }
218
219  /**
220   * Obtain an authentication token on behalf of the given user and add it to
221   * the credentials for the given map reduce job.
222   * @param conn The HBase cluster connection
223   * @param user The user for whom to obtain the token
224   * @param job The job configuration in which the token should be stored
225   * @throws IOException If making a remote call to the authentication service fails
226   * @throws InterruptedException If executing as the given user is interrupted
227   */
228  public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
229      throws IOException, InterruptedException {
230    try {
231      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
232
233      if (token == null) {
234        throw new IOException("No token returned for user " + user.getName());
235      }
236      Text clusterId = getClusterId(token);
237      if (LOG.isDebugEnabled()) {
238        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
239            user.getName() + " on cluster " + clusterId.toString());
240      }
241      job.getCredentials().addToken(clusterId, token);
242    } catch (IOException ioe) {
243      throw ioe;
244    } catch (InterruptedException ie) {
245      throw ie;
246    } catch (RuntimeException re) {
247      throw re;
248    } catch (Exception e) {
249      throw new UndeclaredThrowableException(e,
250          "Unexpected exception obtaining token for user "+user.getName());
251    }
252  }
253
254  /**
255   * Checks for an authentication token for the given user, obtaining a new token if necessary,
256   * and adds it to the credentials for the given map reduce job.
257   *
258   * @param conn The HBase cluster connection
259   * @param user The user for whom to obtain the token
260   * @param job The job configuration in which the token should be stored
261   * @throws IOException If making a remote call to the authentication service fails
262   * @throws InterruptedException If executing as the given user is interrupted
263   */
264  public static void addTokenForJob(final Connection conn, final JobConf job, User user)
265      throws IOException, InterruptedException {
266
267    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
268    if (token == null) {
269      token = obtainToken(conn, user);
270    }
271    job.getCredentials().addToken(token.getService(), token);
272  }
273
274  /**
275   * Checks for an authentication token for the given user, obtaining a new token if necessary,
276   * and adds it to the credentials for the given map reduce job.
277   *
278   * @param conn The HBase cluster connection
279   * @param user The user for whom to obtain the token
280   * @param job The job instance in which the token should be stored
281   * @throws IOException If making a remote call to the authentication service fails
282   * @throws InterruptedException If executing as the given user is interrupted
283   */
284  public static void addTokenForJob(final Connection conn, User user, Job job)
285      throws IOException, InterruptedException {
286    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
287    if (token == null) {
288      token = obtainToken(conn, user);
289    }
290    job.getCredentials().addToken(token.getService(), token);
291  }
292
293  /**
294   * Checks if an authentication tokens exists for the connected cluster,
295   * obtaining one if needed and adding it to the user's credentials.
296   *
297   * @param conn The HBase cluster connection
298   * @param user The user for whom to obtain the token
299   * @throws IOException If making a remote call to the authentication service fails
300   * @throws InterruptedException If executing as the given user is interrupted
301   * @return true if the token was added, false if it already existed
302   */
303  public static boolean addTokenIfMissing(Connection conn, User user)
304      throws IOException, InterruptedException {
305    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
306    if (token == null) {
307      token = obtainToken(conn, user);
308      user.getUGI().addToken(token.getService(), token);
309      return true;
310    }
311    return false;
312  }
313
314  /**
315   * Get the authentication token of the user for the cluster specified in the configuration
316   * @return null if the user does not have the token, otherwise the auth token for the cluster.
317   */
318  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
319      throws IOException, InterruptedException {
320    ZKWatcher zkw = new ZKWatcher(conf, "TokenUtil-getAuthToken", null);
321    try {
322      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
323      if (clusterId == null) {
324        throw new IOException("Failed to get cluster ID");
325      }
326      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
327    } catch (KeeperException e) {
328      throw new IOException(e);
329    } finally {
330      zkw.close();
331    }
332  }
333
334  /**
335   * Converts a protobuf Token message back into a Token instance.
336   *
337   * @param proto the protobuf Token message
338   * @return the Token instance
339   */
340  public static Token<AuthenticationTokenIdentifier> toToken(AuthenticationProtos.Token proto) {
341    return new Token<>(
342        proto.hasIdentifier() ? proto.getIdentifier().toByteArray() : null,
343        proto.hasPassword() ? proto.getPassword().toByteArray() : null,
344        AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE,
345        proto.hasService() ? new Text(proto.getService().toStringUtf8()) : null);
346  }
347}