001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.token;
020
021import java.io.IOException;
022import java.lang.reflect.UndeclaredThrowableException;
023import java.security.PrivilegedExceptionAction;
024
025import com.google.protobuf.ByteString;
026import com.google.protobuf.ServiceException;
027
028import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HConstants;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Connection;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
035import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
036import org.apache.hadoop.hbase.security.User;
037import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
038import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
039import org.apache.hadoop.io.Text;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapreduce.Job;
042import org.apache.hadoop.security.token.Token;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.apache.zookeeper.KeeperException;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Utility methods for obtaining authentication tokens.
050 */
051@InterfaceAudience.Public
052public class TokenUtil {
053  // This class is referenced indirectly by User out in common; instances are created by reflection
054  private static final Logger LOG = LoggerFactory.getLogger(TokenUtil.class);
055
056  // Set in TestTokenUtil via reflection
057  private static ServiceException injectedException;
058
059  private static void injectFault() throws ServiceException {
060    if (injectedException != null) {
061      throw injectedException;
062    }
063  }
064
065  /**
066   * Obtain and return an authentication token for the current user.
067   * @param conn The HBase cluster connection
068   * @throws IOException if a remote error or serialization problem occurs.
069   * @return the authentication token instance
070   */
071  public static Token<AuthenticationTokenIdentifier> obtainToken(
072      Connection conn) throws IOException {
073    Table meta = null;
074    try {
075      injectFault();
076
077      meta = conn.getTable(TableName.META_TABLE_NAME);
078      CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
079      AuthenticationProtos.AuthenticationService.BlockingInterface service =
080          AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
081      AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
082          AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
083
084      return toToken(response.getToken());
085    } catch (ServiceException se) {
086      throw ProtobufUtil.handleRemoteException(se);
087    } finally {
088      if (meta != null) {
089        meta.close();
090      }
091    }
092  }
093
094
095  /**
096   * Converts a Token instance (with embedded identifier) to the protobuf representation.
097   *
098   * @param token the Token instance to copy
099   * @return the protobuf Token message
100   */
101  public static AuthenticationProtos.Token toToken(Token<AuthenticationTokenIdentifier> token) {
102    AuthenticationProtos.Token.Builder builder = AuthenticationProtos.Token.newBuilder();
103    builder.setIdentifier(ByteString.copyFrom(token.getIdentifier()));
104    builder.setPassword(ByteString.copyFrom(token.getPassword()));
105    if (token.getService() != null) {
106      builder.setService(ByteString.copyFromUtf8(token.getService().toString()));
107    }
108    return builder.build();
109  }
110
111  /**
112   * Obtain and return an authentication token for the current user.
113   * @param conn The HBase cluster connection
114   * @return the authentication token instance
115   */
116  public static Token<AuthenticationTokenIdentifier> obtainToken(
117      final Connection conn, User user) throws IOException, InterruptedException {
118    return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
119      @Override
120      public Token<AuthenticationTokenIdentifier> run() throws Exception {
121        return obtainToken(conn);
122      }
123    });
124  }
125
126
127  private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
128      throws IOException {
129    return token.getService() != null
130        ? token.getService() : new Text("default");
131  }
132
133  /**
134   * Obtain an authentication token for the given user and add it to the
135   * user's credentials.
136   * @param conn The HBase cluster connection
137   * @param user The user for whom to obtain the token
138   * @throws IOException If making a remote call to the authentication service fails
139   * @throws InterruptedException If executing as the given user is interrupted
140   */
141  public static void obtainAndCacheToken(final Connection conn,
142      User user)
143      throws IOException, InterruptedException {
144    try {
145      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
146
147      if (token == null) {
148        throw new IOException("No token returned for user " + user.getName());
149      }
150      if (LOG.isDebugEnabled()) {
151        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
152            user.getName());
153      }
154      user.addToken(token);
155    } catch (IOException ioe) {
156      throw ioe;
157    } catch (InterruptedException ie) {
158      throw ie;
159    } catch (RuntimeException re) {
160      throw re;
161    } catch (Exception e) {
162      throw new UndeclaredThrowableException(e,
163          "Unexpected exception obtaining token for user " + user.getName());
164    }
165  }
166
167  /**
168   * Obtain an authentication token on behalf of the given user and add it to
169   * the credentials for the given map reduce job.
170   * @param conn The HBase cluster connection
171   * @param user The user for whom to obtain the token
172   * @param job The job instance in which the token should be stored
173   * @throws IOException If making a remote call to the authentication service fails
174   * @throws InterruptedException If executing as the given user is interrupted
175   */
176  public static void obtainTokenForJob(final Connection conn,
177      User user, Job job)
178      throws IOException, InterruptedException {
179    try {
180      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
181
182      if (token == null) {
183        throw new IOException("No token returned for user " + user.getName());
184      }
185      Text clusterId = getClusterId(token);
186      if (LOG.isDebugEnabled()) {
187        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
188            user.getName() + " on cluster " + clusterId.toString());
189      }
190      job.getCredentials().addToken(clusterId, token);
191    } catch (IOException ioe) {
192      throw ioe;
193    } catch (InterruptedException ie) {
194      throw ie;
195    } catch (RuntimeException re) {
196      throw re;
197    } catch (Exception e) {
198      throw new UndeclaredThrowableException(e,
199          "Unexpected exception obtaining token for user " + user.getName());
200    }
201  }
202
203  /**
204   * Obtain an authentication token on behalf of the given user and add it to
205   * the credentials for the given map reduce job.
206   * @param conn The HBase cluster connection
207   * @param user The user for whom to obtain the token
208   * @param job The job configuration in which the token should be stored
209   * @throws IOException If making a remote call to the authentication service fails
210   * @throws InterruptedException If executing as the given user is interrupted
211   */
212  public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
213      throws IOException, InterruptedException {
214    try {
215      Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
216
217      if (token == null) {
218        throw new IOException("No token returned for user " + user.getName());
219      }
220      Text clusterId = getClusterId(token);
221      if (LOG.isDebugEnabled()) {
222        LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
223            user.getName() + " on cluster " + clusterId.toString());
224      }
225      job.getCredentials().addToken(clusterId, token);
226    } catch (IOException ioe) {
227      throw ioe;
228    } catch (InterruptedException ie) {
229      throw ie;
230    } catch (RuntimeException re) {
231      throw re;
232    } catch (Exception e) {
233      throw new UndeclaredThrowableException(e,
234          "Unexpected exception obtaining token for user "+user.getName());
235    }
236  }
237
238  /**
239   * Checks for an authentication token for the given user, obtaining a new token if necessary,
240   * and adds it to the credentials for the given map reduce job.
241   *
242   * @param conn The HBase cluster connection
243   * @param user The user for whom to obtain the token
244   * @param job The job configuration in which the token should be stored
245   * @throws IOException If making a remote call to the authentication service fails
246   * @throws InterruptedException If executing as the given user is interrupted
247   */
248  public static void addTokenForJob(final Connection conn, final JobConf job, User user)
249      throws IOException, InterruptedException {
250
251    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
252    if (token == null) {
253      token = obtainToken(conn, user);
254    }
255    job.getCredentials().addToken(token.getService(), token);
256  }
257
258  /**
259   * Checks for an authentication token for the given user, obtaining a new token if necessary,
260   * and adds it to the credentials for the given map reduce job.
261   *
262   * @param conn The HBase cluster connection
263   * @param user The user for whom to obtain the token
264   * @param job The job instance in which the token should be stored
265   * @throws IOException If making a remote call to the authentication service fails
266   * @throws InterruptedException If executing as the given user is interrupted
267   */
268  public static void addTokenForJob(final Connection conn, User user, Job job)
269      throws IOException, InterruptedException {
270    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
271    if (token == null) {
272      token = obtainToken(conn, user);
273    }
274    job.getCredentials().addToken(token.getService(), token);
275  }
276
277  /**
278   * Checks if an authentication tokens exists for the connected cluster,
279   * obtaining one if needed and adding it to the user's credentials.
280   *
281   * @param conn The HBase cluster connection
282   * @param user The user for whom to obtain the token
283   * @throws IOException If making a remote call to the authentication service fails
284   * @throws InterruptedException If executing as the given user is interrupted
285   * @return true if the token was added, false if it already existed
286   */
287  public static boolean addTokenIfMissing(Connection conn, User user)
288      throws IOException, InterruptedException {
289    Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
290    if (token == null) {
291      token = obtainToken(conn, user);
292      user.getUGI().addToken(token.getService(), token);
293      return true;
294    }
295    return false;
296  }
297
298  /**
299   * Get the authentication token of the user for the cluster specified in the configuration
300   * @return null if the user does not have the token, otherwise the auth token for the cluster.
301   */
302  private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
303      throws IOException, InterruptedException {
304    ZKWatcher zkw = new ZKWatcher(conf, "TokenUtil-getAuthToken", null);
305    try {
306      String clusterId = ZKClusterId.readClusterIdZNode(zkw);
307      if (clusterId == null) {
308        throw new IOException("Failed to get cluster ID");
309      }
310      return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
311    } catch (KeeperException e) {
312      throw new IOException(e);
313    } finally {
314      zkw.close();
315    }
316  }
317
318  /**
319   * Converts a protobuf Token message back into a Token instance.
320   *
321   * @param proto the protobuf Token message
322   * @return the Token instance
323   */
324  public static Token<AuthenticationTokenIdentifier> toToken(AuthenticationProtos.Token proto) {
325    return new Token<>(
326        proto.hasIdentifier() ? proto.getIdentifier().toByteArray() : null,
327        proto.hasPassword() ? proto.getPassword().toByteArray() : null,
328        AuthenticationTokenIdentifier.AUTH_TOKEN_TYPE,
329        proto.hasService() ? new Text(proto.getService().toStringUtf8()) : null);
330  }
331}