View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.token;
20  
21  import java.io.IOException;
22  import java.lang.reflect.UndeclaredThrowableException;
23  import java.security.PrivilegedExceptionAction;
24  
25  import com.google.protobuf.ServiceException;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.Table;
36  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
39  import org.apache.hadoop.hbase.security.User;
40  import org.apache.hadoop.hbase.security.UserProvider;
41  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
42  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43  import org.apache.hadoop.io.Text;
44  import org.apache.hadoop.mapred.JobConf;
45  import org.apache.hadoop.mapreduce.Job;
46  import org.apache.hadoop.security.UserGroupInformation;
47  import org.apache.hadoop.security.token.Token;
48  import org.apache.zookeeper.KeeperException;
49  
50  /**
51   * Utility methods for obtaining authentication tokens.
52   */
53  @InterfaceAudience.Public
54  @InterfaceStability.Evolving
55  public class TokenUtil {
56    // This class is referenced indirectly by User out in common; instances are created by reflection
57    private static final Log LOG = LogFactory.getLog(TokenUtil.class);
58  
59    /**
60     * Obtain and return an authentication token for the current user.
61     * @param conf the configuration for connecting to the cluster
62     * @return the authentication token instance
63     * @deprecated Replaced by {@link #obtainToken(Connection)}
64     */
65    @Deprecated
66    public static Token<AuthenticationTokenIdentifier> obtainToken(
67        Configuration conf) throws IOException {
68      try (Connection connection = ConnectionFactory.createConnection(conf)) {
69        return obtainToken(connection);
70      }
71    }
72  
73    /**
74     * Obtain and return an authentication token for the current user.
75     * @param conn The HBase cluster connection
76     * @return the authentication token instance
77     */
78    public static Token<AuthenticationTokenIdentifier> obtainToken(
79        Connection conn) throws IOException {
80      Table meta = null;
81      try {
82        meta = conn.getTable(TableName.META_TABLE_NAME);
83        CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
84        AuthenticationProtos.AuthenticationService.BlockingInterface service =
85            AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
86        AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
87            AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
88  
89        return ProtobufUtil.toToken(response.getToken());
90      } catch (ServiceException se) {
91        ProtobufUtil.toIOException(se);
92      } finally {
93        if (meta != null) {
94          meta.close();
95        }
96      }
97      // dummy return for ServiceException block
98      return null;
99    }
100 
101   /**
102    * Obtain and return an authentication token for the current user.
103    * @param conn The HBase cluster connection
104    * @return the authentication token instance
105    */
106   public static Token<AuthenticationTokenIdentifier> obtainToken(
107       final Connection conn, User user) throws IOException, InterruptedException {
108     return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
109       @Override
110       public Token<AuthenticationTokenIdentifier> run() throws Exception {
111         return obtainToken(conn);
112       }
113     });
114   }
115 
116 
117   private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
118       throws IOException {
119     return token.getService() != null
120         ? token.getService() : new Text("default");
121   }
122 
123   /**
124    * Obtain an authentication token for the given user and add it to the
125    * user's credentials.
126    * @param conf The configuration for connecting to the cluster
127    * @param user The user for whom to obtain the token
128    * @throws IOException If making a remote call to the authentication service fails
129    * @throws InterruptedException If executing as the given user is interrupted
130    * @deprecated Replaced by {@link #obtainAndCacheToken(Connection,User)}
131    */
132   @Deprecated
133   public static void obtainAndCacheToken(final Configuration conf,
134                                          UserGroupInformation user)
135       throws IOException, InterruptedException {
136     Connection conn = ConnectionFactory.createConnection(conf);
137     try {
138       UserProvider userProvider = UserProvider.instantiate(conf);
139       obtainAndCacheToken(conn, userProvider.create(user));
140     } finally {
141       conn.close();
142     }
143   }
144 
145   /**
146    * Obtain an authentication token for the given user and add it to the
147    * user's credentials.
148    * @param conn The HBase cluster connection
149    * @param user The user for whom to obtain the token
150    * @throws IOException If making a remote call to the authentication service fails
151    * @throws InterruptedException If executing as the given user is interrupted
152    */
153   public static void obtainAndCacheToken(final Connection conn,
154       User user)
155       throws IOException, InterruptedException {
156     try {
157       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
158 
159       if (token == null) {
160         throw new IOException("No token returned for user " + user.getName());
161       }
162       if (LOG.isDebugEnabled()) {
163         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
164             user.getName());
165       }
166       user.addToken(token);
167     } catch (IOException ioe) {
168       throw ioe;
169     } catch (InterruptedException ie) {
170       throw ie;
171     } catch (RuntimeException re) {
172       throw re;
173     } catch (Exception e) {
174       throw new UndeclaredThrowableException(e,
175           "Unexpected exception obtaining token for user " + user.getName());
176     }
177   }
178 
179   /**
180    * Obtain an authentication token on behalf of the given user and add it to
181    * the credentials for the given map reduce job.
182    * @param conf The configuration for connecting to the cluster
183    * @param user The user for whom to obtain the token
184    * @param job The job instance in which the token should be stored
185    * @throws IOException If making a remote call to the authentication service fails
186    * @throws InterruptedException If executing as the given user is interrupted
187    * @deprecated Replaced by {@link #obtainTokenForJob(Connection,User,Job)}
188    */
189   @Deprecated
190   public static void obtainTokenForJob(final Configuration conf,
191                                        UserGroupInformation user, Job job)
192       throws IOException, InterruptedException {
193     Connection conn = ConnectionFactory.createConnection(conf);
194     try {
195       UserProvider userProvider = UserProvider.instantiate(conf);
196       obtainTokenForJob(conn, userProvider.create(user), job);
197     } finally {
198       conn.close();
199     }
200   }
201 
202   /**
203    * Obtain an authentication token on behalf of the given user and add it to
204    * the credentials for the given map reduce job.
205    * @param conn The HBase cluster connection
206    * @param user The user for whom to obtain the token
207    * @param job The job instance in which the token should be stored
208    * @throws IOException If making a remote call to the authentication service fails
209    * @throws InterruptedException If executing as the given user is interrupted
210    */
211   public static void obtainTokenForJob(final Connection conn,
212       User user, Job job)
213       throws IOException, InterruptedException {
214     try {
215       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
216 
217       if (token == null) {
218         throw new IOException("No token returned for user " + user.getName());
219       }
220       Text clusterId = getClusterId(token);
221       if (LOG.isDebugEnabled()) {
222         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
223             user.getName() + " on cluster " + clusterId.toString());
224       }
225       job.getCredentials().addToken(clusterId, token);
226     } catch (IOException ioe) {
227       throw ioe;
228     } catch (InterruptedException ie) {
229       throw ie;
230     } catch (RuntimeException re) {
231       throw re;
232     } catch (Exception e) {
233       throw new UndeclaredThrowableException(e,
234           "Unexpected exception obtaining token for user " + user.getName());
235     }
236   }
237 
238   /**
239    * Obtain an authentication token on behalf of the given user and add it to
240    * the credentials for the given map reduce job.
241    * @param user The user for whom to obtain the token
242    * @param job The job configuration in which the token should be stored
243    * @throws IOException If making a remote call to the authentication service fails
244    * @throws InterruptedException If executing as the given user is interrupted
245    * @deprecated Replaced by {@link #obtainTokenForJob(Connection,JobConf,User)}
246    */
247   @Deprecated
248   public static void obtainTokenForJob(final JobConf job,
249                                        UserGroupInformation user)
250       throws IOException, InterruptedException {
251     Connection conn = ConnectionFactory.createConnection(job);
252     try {
253       UserProvider userProvider = UserProvider.instantiate(job);
254       obtainTokenForJob(conn, job, userProvider.create(user));
255     } finally {
256       conn.close();
257     }
258   }
259 
260   /**
261    * Obtain an authentication token on behalf of the given user and add it to
262    * the credentials for the given map reduce job.
263    * @param conn The HBase cluster connection
264    * @param user The user for whom to obtain the token
265    * @param job The job configuration in which the token should be stored
266    * @throws IOException If making a remote call to the authentication service fails
267    * @throws InterruptedException If executing as the given user is interrupted
268    */
269   public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
270       throws IOException, InterruptedException {
271     try {
272       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
273 
274       if (token == null) {
275         throw new IOException("No token returned for user " + user.getName());
276       }
277       Text clusterId = getClusterId(token);
278       if (LOG.isDebugEnabled()) {
279         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
280             user.getName() + " on cluster " + clusterId.toString());
281       }
282       job.getCredentials().addToken(clusterId, token);
283     } catch (IOException ioe) {
284       throw ioe;
285     } catch (InterruptedException ie) {
286       throw ie;
287     } catch (RuntimeException re) {
288       throw re;
289     } catch (Exception e) {
290       throw new UndeclaredThrowableException(e,
291           "Unexpected exception obtaining token for user "+user.getName());
292     }
293   }
294 
295   /**
296    * Checks for an authentication token for the given user, obtaining a new token if necessary,
297    * and adds it to the credentials for the given map reduce job.
298    *
299    * @param conn The HBase cluster connection
300    * @param user The user for whom to obtain the token
301    * @param job The job configuration in which the token should be stored
302    * @throws IOException If making a remote call to the authentication service fails
303    * @throws InterruptedException If executing as the given user is interrupted
304    */
305   public static void addTokenForJob(final Connection conn, final JobConf job, User user)
306       throws IOException, InterruptedException {
307 
308     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
309     if (token == null) {
310       token = obtainToken(conn, user);
311     }
312     job.getCredentials().addToken(token.getService(), token);
313   }
314 
315   /**
316    * Checks for an authentication token for the given user, obtaining a new token if necessary,
317    * and adds it to the credentials for the given map reduce job.
318    *
319    * @param conn The HBase cluster connection
320    * @param user The user for whom to obtain the token
321    * @param job The job instance in which the token should be stored
322    * @throws IOException If making a remote call to the authentication service fails
323    * @throws InterruptedException If executing as the given user is interrupted
324    */
325   public static void addTokenForJob(final Connection conn, User user, Job job)
326       throws IOException, InterruptedException {
327     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
328     if (token == null) {
329       token = obtainToken(conn, user);
330     }
331     job.getCredentials().addToken(token.getService(), token);
332   }
333 
334   /**
335    * Checks if an authentication tokens exists for the connected cluster,
336    * obtaining one if needed and adding it to the user's credentials.
337    *
338    * @param conn The HBase cluster connection
339    * @param user The user for whom to obtain the token
340    * @throws IOException If making a remote call to the authentication service fails
341    * @throws InterruptedException If executing as the given user is interrupted
342    * @return true if the token was added, false if it already existed
343    */
344   public static boolean addTokenIfMissing(Connection conn, User user)
345       throws IOException, InterruptedException {
346     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
347     if (token == null) {
348       token = obtainToken(conn, user);
349       user.getUGI().addToken(token.getService(), token);
350       return true;
351     }
352     return false;
353   }
354 
355   /**
356    * Get the authentication token of the user for the cluster specified in the configuration
357    * @return null if the user does not have the token, otherwise the auth token for the cluster.
358    */
359   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
360       throws IOException, InterruptedException {
361     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
362     try {
363       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
364       if (clusterId == null) {
365         throw new IOException("Failed to get cluster ID");
366       }
367       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
368     } catch (KeeperException e) {
369       throw new IOException(e);
370     } finally {
371       zkw.close();
372     }
373   }
374 }