View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.token;
20  
21  import java.io.IOException;
22  import java.lang.reflect.UndeclaredThrowableException;
23  import java.security.PrivilegedExceptionAction;
24  
25  import com.google.protobuf.ServiceException;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.Table;
35  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
38  import org.apache.hadoop.hbase.security.User;
39  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
40  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41  import org.apache.hadoop.io.Text;
42  import org.apache.hadoop.mapred.JobConf;
43  import org.apache.hadoop.mapreduce.Job;
44  import org.apache.hadoop.security.token.Token;
45  import org.apache.zookeeper.KeeperException;
46  
47  /**
48   * Utility methods for obtaining authentication tokens.
49   */
50  @InterfaceAudience.Public
51  @InterfaceStability.Evolving
52  public class TokenUtil {
53    // This class is referenced indirectly by User out in common; instances are created by reflection
54    private static final Log LOG = LogFactory.getLog(TokenUtil.class);
55  
56    /**
57     * Obtain and return an authentication token for the current user.
58     * @param conn The HBase cluster connection
59     * @return the authentication token instance
60     */
61    public static Token<AuthenticationTokenIdentifier> obtainToken(
62        Connection conn) throws IOException {
63      Table meta = null;
64      try {
65        meta = conn.getTable(TableName.META_TABLE_NAME);
66        CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
67        AuthenticationProtos.AuthenticationService.BlockingInterface service =
68            AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
69        AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
70            AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
71  
72        return ProtobufUtil.toToken(response.getToken());
73      } catch (ServiceException se) {
74        ProtobufUtil.toIOException(se);
75      } finally {
76        if (meta != null) {
77          meta.close();
78        }
79      }
80      // dummy return for ServiceException block
81      return null;
82    }
83  
84    /**
85     * Obtain and return an authentication token for the current user.
86     * @param conn The HBase cluster connection
87     * @return the authentication token instance
88     */
89    public static Token<AuthenticationTokenIdentifier> obtainToken(
90        final Connection conn, User user) throws IOException, InterruptedException {
91      return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
92        @Override
93        public Token<AuthenticationTokenIdentifier> run() throws Exception {
94          return obtainToken(conn);
95        }
96      });
97    }
98  
99  
100   private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
101       throws IOException {
102     return token.getService() != null
103         ? token.getService() : new Text("default");
104   }
105 
106   /**
107    * Obtain an authentication token for the given user and add it to the
108    * user's credentials.
109    * @param conn The HBase cluster connection
110    * @param user The user for whom to obtain the token
111    * @throws IOException If making a remote call to the authentication service fails
112    * @throws InterruptedException If executing as the given user is interrupted
113    */
114   public static void obtainAndCacheToken(final Connection conn,
115       User user)
116       throws IOException, InterruptedException {
117     try {
118       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
119 
120       if (token == null) {
121         throw new IOException("No token returned for user " + user.getName());
122       }
123       if (LOG.isDebugEnabled()) {
124         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
125             user.getName());
126       }
127       user.addToken(token);
128     } catch (IOException ioe) {
129       throw ioe;
130     } catch (InterruptedException ie) {
131       throw ie;
132     } catch (RuntimeException re) {
133       throw re;
134     } catch (Exception e) {
135       throw new UndeclaredThrowableException(e,
136           "Unexpected exception obtaining token for user " + user.getName());
137     }
138   }
139 
140   /**
141    * Obtain an authentication token on behalf of the given user and add it to
142    * the credentials for the given map reduce job.
143    * @param conn The HBase cluster connection
144    * @param user The user for whom to obtain the token
145    * @param job The job instance in which the token should be stored
146    * @throws IOException If making a remote call to the authentication service fails
147    * @throws InterruptedException If executing as the given user is interrupted
148    */
149   public static void obtainTokenForJob(final Connection conn,
150       User user, Job job)
151       throws IOException, InterruptedException {
152     try {
153       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
154 
155       if (token == null) {
156         throw new IOException("No token returned for user " + user.getName());
157       }
158       Text clusterId = getClusterId(token);
159       if (LOG.isDebugEnabled()) {
160         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
161             user.getName() + " on cluster " + clusterId.toString());
162       }
163       job.getCredentials().addToken(clusterId, token);
164     } catch (IOException ioe) {
165       throw ioe;
166     } catch (InterruptedException ie) {
167       throw ie;
168     } catch (RuntimeException re) {
169       throw re;
170     } catch (Exception e) {
171       throw new UndeclaredThrowableException(e,
172           "Unexpected exception obtaining token for user " + user.getName());
173     }
174   }
175 
176   /**
177    * Obtain an authentication token on behalf of the given user and add it to
178    * the credentials for the given map reduce job.
179    * @param conn The HBase cluster connection
180    * @param user The user for whom to obtain the token
181    * @param job The job configuration in which the token should be stored
182    * @throws IOException If making a remote call to the authentication service fails
183    * @throws InterruptedException If executing as the given user is interrupted
184    */
185   public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
186       throws IOException, InterruptedException {
187     try {
188       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
189 
190       if (token == null) {
191         throw new IOException("No token returned for user " + user.getName());
192       }
193       Text clusterId = getClusterId(token);
194       if (LOG.isDebugEnabled()) {
195         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
196             user.getName() + " on cluster " + clusterId.toString());
197       }
198       job.getCredentials().addToken(clusterId, token);
199     } catch (IOException ioe) {
200       throw ioe;
201     } catch (InterruptedException ie) {
202       throw ie;
203     } catch (RuntimeException re) {
204       throw re;
205     } catch (Exception e) {
206       throw new UndeclaredThrowableException(e,
207           "Unexpected exception obtaining token for user "+user.getName());
208     }
209   }
210 
211   /**
212    * Checks for an authentication token for the given user, obtaining a new token if necessary,
213    * and adds it to the credentials for the given map reduce job.
214    *
215    * @param conn The HBase cluster connection
216    * @param user The user for whom to obtain the token
217    * @param job The job configuration in which the token should be stored
218    * @throws IOException If making a remote call to the authentication service fails
219    * @throws InterruptedException If executing as the given user is interrupted
220    */
221   public static void addTokenForJob(final Connection conn, final JobConf job, User user)
222       throws IOException, InterruptedException {
223 
224     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
225     if (token == null) {
226       token = obtainToken(conn, user);
227     }
228     job.getCredentials().addToken(token.getService(), token);
229   }
230 
231   /**
232    * Checks for an authentication token for the given user, obtaining a new token if necessary,
233    * and adds it to the credentials for the given map reduce job.
234    *
235    * @param conn The HBase cluster connection
236    * @param user The user for whom to obtain the token
237    * @param job The job instance in which the token should be stored
238    * @throws IOException If making a remote call to the authentication service fails
239    * @throws InterruptedException If executing as the given user is interrupted
240    */
241   public static void addTokenForJob(final Connection conn, User user, Job job)
242       throws IOException, InterruptedException {
243     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
244     if (token == null) {
245       token = obtainToken(conn, user);
246     }
247     job.getCredentials().addToken(token.getService(), token);
248   }
249 
250   /**
251    * Checks if an authentication tokens exists for the connected cluster,
252    * obtaining one if needed and adding it to the user's credentials.
253    *
254    * @param conn The HBase cluster connection
255    * @param user The user for whom to obtain the token
256    * @throws IOException If making a remote call to the authentication service fails
257    * @throws InterruptedException If executing as the given user is interrupted
258    * @return true if the token was added, false if it already existed
259    */
260   public static boolean addTokenIfMissing(Connection conn, User user)
261       throws IOException, InterruptedException {
262     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
263     if (token == null) {
264       token = obtainToken(conn, user);
265       user.getUGI().addToken(token.getService(), token);
266       return true;
267     }
268     return false;
269   }
270 
271   /**
272    * Get the authentication token of the user for the cluster specified in the configuration
273    * @return null if the user does not have the token, otherwise the auth token for the cluster.
274    */
275   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
276       throws IOException, InterruptedException {
277     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
278     try {
279       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
280       if (clusterId == null) {
281         throw new IOException("Failed to get cluster ID");
282       }
283       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
284     } catch (KeeperException e) {
285       throw new IOException(e);
286     } finally {
287       zkw.close();
288     }
289   }
290 }