View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.token;
20  
21  import java.io.IOException;
22  import java.lang.reflect.UndeclaredThrowableException;
23  import java.security.PrivilegedExceptionAction;
24  
25  import com.google.protobuf.ServiceException;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.Table;
36  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
39  import org.apache.hadoop.hbase.security.User;
40  import org.apache.hadoop.hbase.security.UserProvider;
41  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
42  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
43  import org.apache.hadoop.io.Text;
44  import org.apache.hadoop.mapred.JobConf;
45  import org.apache.hadoop.mapreduce.Job;
46  import org.apache.hadoop.security.UserGroupInformation;
47  import org.apache.hadoop.security.token.Token;
48  import org.apache.zookeeper.KeeperException;
49  
50  /**
51   * Utility methods for obtaining authentication tokens.
52   */
53  @InterfaceAudience.Public
54  @InterfaceStability.Evolving
55  public class TokenUtil {
56    // This class is referenced indirectly by User out in common; instances are created by reflection
57    private static final Log LOG = LogFactory.getLog(TokenUtil.class);
58  
59    /**
60     * Obtain and return an authentication token for the current user.
61     * @param conn The HBase cluster connection
62     * @return the authentication token instance
63     */
64    public static Token<AuthenticationTokenIdentifier> obtainToken(
65        Connection conn) throws IOException {
66      Table meta = null;
67      try {
68        meta = conn.getTable(TableName.META_TABLE_NAME);
69        CoprocessorRpcChannel rpcChannel = meta.coprocessorService(HConstants.EMPTY_START_ROW);
70        AuthenticationProtos.AuthenticationService.BlockingInterface service =
71            AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel);
72        AuthenticationProtos.GetAuthenticationTokenResponse response = service.getAuthenticationToken(null,
73            AuthenticationProtos.GetAuthenticationTokenRequest.getDefaultInstance());
74  
75        return ProtobufUtil.toToken(response.getToken());
76      } catch (ServiceException se) {
77        ProtobufUtil.toIOException(se);
78      } finally {
79        if (meta != null) {
80          meta.close();
81        }
82      }
83      // dummy return for ServiceException block
84      return null;
85    }
86  
87    /**
88     * Obtain and return an authentication token for the current user.
89     * @param conn The HBase cluster connection
90     * @return the authentication token instance
91     */
92    public static Token<AuthenticationTokenIdentifier> obtainToken(
93        final Connection conn, User user) throws IOException, InterruptedException {
94      return user.runAs(new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() {
95        @Override
96        public Token<AuthenticationTokenIdentifier> run() throws Exception {
97          return obtainToken(conn);
98        }
99      });
100   }
101 
102 
103   private static Text getClusterId(Token<AuthenticationTokenIdentifier> token)
104       throws IOException {
105     return token.getService() != null
106         ? token.getService() : new Text("default");
107   }
108 
109   /**
110    * Obtain an authentication token for the given user and add it to the
111    * user's credentials.
112    * @param conn The HBase cluster connection
113    * @param user The user for whom to obtain the token
114    * @throws IOException If making a remote call to the authentication service fails
115    * @throws InterruptedException If executing as the given user is interrupted
116    */
117   public static void obtainAndCacheToken(final Connection conn,
118       User user)
119       throws IOException, InterruptedException {
120     try {
121       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
122 
123       if (token == null) {
124         throw new IOException("No token returned for user " + user.getName());
125       }
126       if (LOG.isDebugEnabled()) {
127         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
128             user.getName());
129       }
130       user.addToken(token);
131     } catch (IOException ioe) {
132       throw ioe;
133     } catch (InterruptedException ie) {
134       throw ie;
135     } catch (RuntimeException re) {
136       throw re;
137     } catch (Exception e) {
138       throw new UndeclaredThrowableException(e,
139           "Unexpected exception obtaining token for user " + user.getName());
140     }
141   }
142 
143   /**
144    * Obtain an authentication token on behalf of the given user and add it to
145    * the credentials for the given map reduce job.
146    * @param conn The HBase cluster connection
147    * @param user The user for whom to obtain the token
148    * @param job The job instance in which the token should be stored
149    * @throws IOException If making a remote call to the authentication service fails
150    * @throws InterruptedException If executing as the given user is interrupted
151    */
152   public static void obtainTokenForJob(final Connection conn,
153       User user, Job job)
154       throws IOException, InterruptedException {
155     try {
156       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
157 
158       if (token == null) {
159         throw new IOException("No token returned for user " + user.getName());
160       }
161       Text clusterId = getClusterId(token);
162       if (LOG.isDebugEnabled()) {
163         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
164             user.getName() + " on cluster " + clusterId.toString());
165       }
166       job.getCredentials().addToken(clusterId, token);
167     } catch (IOException ioe) {
168       throw ioe;
169     } catch (InterruptedException ie) {
170       throw ie;
171     } catch (RuntimeException re) {
172       throw re;
173     } catch (Exception e) {
174       throw new UndeclaredThrowableException(e,
175           "Unexpected exception obtaining token for user " + user.getName());
176     }
177   }
178 
179   /**
180    * Obtain an authentication token on behalf of the given user and add it to
181    * the credentials for the given map reduce job.
182    * @param conn The HBase cluster connection
183    * @param user The user for whom to obtain the token
184    * @param job The job configuration in which the token should be stored
185    * @throws IOException If making a remote call to the authentication service fails
186    * @throws InterruptedException If executing as the given user is interrupted
187    */
188   public static void obtainTokenForJob(final Connection conn, final JobConf job, User user)
189       throws IOException, InterruptedException {
190     try {
191       Token<AuthenticationTokenIdentifier> token = obtainToken(conn, user);
192 
193       if (token == null) {
194         throw new IOException("No token returned for user " + user.getName());
195       }
196       Text clusterId = getClusterId(token);
197       if (LOG.isDebugEnabled()) {
198         LOG.debug("Obtained token " + token.getKind().toString() + " for user " +
199             user.getName() + " on cluster " + clusterId.toString());
200       }
201       job.getCredentials().addToken(clusterId, token);
202     } catch (IOException ioe) {
203       throw ioe;
204     } catch (InterruptedException ie) {
205       throw ie;
206     } catch (RuntimeException re) {
207       throw re;
208     } catch (Exception e) {
209       throw new UndeclaredThrowableException(e,
210           "Unexpected exception obtaining token for user "+user.getName());
211     }
212   }
213 
214   /**
215    * Checks for an authentication token for the given user, obtaining a new token if necessary,
216    * and adds it to the credentials for the given map reduce job.
217    *
218    * @param conn The HBase cluster connection
219    * @param user The user for whom to obtain the token
220    * @param job The job configuration in which the token should be stored
221    * @throws IOException If making a remote call to the authentication service fails
222    * @throws InterruptedException If executing as the given user is interrupted
223    */
224   public static void addTokenForJob(final Connection conn, final JobConf job, User user)
225       throws IOException, InterruptedException {
226 
227     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
228     if (token == null) {
229       token = obtainToken(conn, user);
230     }
231     job.getCredentials().addToken(token.getService(), token);
232   }
233 
234   /**
235    * Checks for an authentication token for the given user, obtaining a new token if necessary,
236    * and adds it to the credentials for the given map reduce job.
237    *
238    * @param conn The HBase cluster connection
239    * @param user The user for whom to obtain the token
240    * @param job The job instance in which the token should be stored
241    * @throws IOException If making a remote call to the authentication service fails
242    * @throws InterruptedException If executing as the given user is interrupted
243    */
244   public static void addTokenForJob(final Connection conn, User user, Job job)
245       throws IOException, InterruptedException {
246     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
247     if (token == null) {
248       token = obtainToken(conn, user);
249     }
250     job.getCredentials().addToken(token.getService(), token);
251   }
252 
253   /**
254    * Checks if an authentication tokens exists for the connected cluster,
255    * obtaining one if needed and adding it to the user's credentials.
256    *
257    * @param conn The HBase cluster connection
258    * @param user The user for whom to obtain the token
259    * @throws IOException If making a remote call to the authentication service fails
260    * @throws InterruptedException If executing as the given user is interrupted
261    * @return true if the token was added, false if it already existed
262    */
263   public static boolean addTokenIfMissing(Connection conn, User user)
264       throws IOException, InterruptedException {
265     Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
266     if (token == null) {
267       token = obtainToken(conn, user);
268       user.getUGI().addToken(token.getService(), token);
269       return true;
270     }
271     return false;
272   }
273 
274   /**
275    * Get the authentication token of the user for the cluster specified in the configuration
276    * @return null if the user does not have the token, otherwise the auth token for the cluster.
277    */
278   private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
279       throws IOException, InterruptedException {
280     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "TokenUtil-getAuthToken", null);
281     try {
282       String clusterId = ZKClusterId.readClusterIdZNode(zkw);
283       if (clusterId == null) {
284         throw new IOException("Failed to get cluster ID");
285       }
286       return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
287     } catch (KeeperException e) {
288       throw new IOException(e);
289     } finally {
290       zkw.close();
291     }
292   }
293 }