001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.token;
019
020import java.io.IOException;
021import java.util.Iterator;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.atomic.AtomicLong;
025import javax.crypto.SecretKey;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Stoppable;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
030import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
031import org.apache.hadoop.hbase.zookeeper.ZKLeaderManager;
032import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
033import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
034import org.apache.hadoop.io.Text;
035import org.apache.hadoop.security.token.SecretManager;
036import org.apache.hadoop.security.token.Token;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.zookeeper.KeeperException;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Manages an internal list of secret keys used to sign new authentication tokens as they are
044 * generated, and to valid existing tokens used for authentication.
045 * <p>
046 * A single instance of {@code AuthenticationTokenSecretManager} will be running as the "leader" in
047 * a given HBase cluster. The leader is responsible for periodically generating new secret keys,
048 * which are then distributed to followers via ZooKeeper, and for expiring previously used secret
049 * keys that are no longer needed (as any tokens using them have expired).
050 * </p>
051 */
052@InterfaceAudience.Private
053public class AuthenticationTokenSecretManager extends SecretManager<AuthenticationTokenIdentifier> {
054
055  static final String NAME_PREFIX = "SecretManager-";
056
057  private static final Logger LOG = LoggerFactory.getLogger(AuthenticationTokenSecretManager.class);
058
059  private long lastKeyUpdate;
060  private long keyUpdateInterval;
061  private long tokenMaxLifetime;
062  private ZKSecretWatcher zkWatcher;
063  private LeaderElector leaderElector;
064  private ZKClusterId clusterId;
065
066  private Map<Integer, AuthenticationKey> allKeys = new ConcurrentHashMap<>();
067  private AuthenticationKey currentKey;
068
069  private int idSeq;
070  private AtomicLong tokenSeq = new AtomicLong();
071  private String name;
072
073  /**
074   * Create a new secret manager instance for generating keys.
075   * @param conf              Configuration to use
076   * @param zk                Connection to zookeeper for handling leader elections
077   * @param keyUpdateInterval Time (in milliseconds) between rolling a new master key for token
078   *                          signing
079   * @param tokenMaxLifetime  Maximum age (in milliseconds) before a token expires and is no longer
080   *                          valid
081   */
082  /*
083   * TODO: Restrict access to this constructor to make rogues instances more difficult. For the
084   * moment this class is instantiated from org.apache.hadoop.hbase.ipc.SecureServer so public
085   * access is needed.
086   */
087  public AuthenticationTokenSecretManager(Configuration conf, ZKWatcher zk, String serverName,
088    long keyUpdateInterval, long tokenMaxLifetime) {
089    this.zkWatcher = new ZKSecretWatcher(conf, zk, this);
090    this.keyUpdateInterval = keyUpdateInterval;
091    this.tokenMaxLifetime = tokenMaxLifetime;
092    this.leaderElector = new LeaderElector(zk, serverName);
093    this.name = NAME_PREFIX + serverName;
094    this.clusterId = new ZKClusterId(zk, zk);
095  }
096
097  public void start() {
098    try {
099      // populate any existing keys
100      this.zkWatcher.start();
101      // try to become leader
102      this.leaderElector.start();
103    } catch (KeeperException ke) {
104      LOG.error("ZooKeeper initialization failed", ke);
105    }
106  }
107
108  public void stop() {
109    this.leaderElector.stop("SecretManager stopping");
110  }
111
112  public boolean isMaster() {
113    return leaderElector.isMaster();
114  }
115
116  public String getName() {
117    return name;
118  }
119
120  @Override
121  protected synchronized byte[] createPassword(AuthenticationTokenIdentifier identifier) {
122    long now = EnvironmentEdgeManager.currentTime();
123    AuthenticationKey secretKey = currentKey;
124    identifier.setKeyId(secretKey.getKeyId());
125    identifier.setIssueDate(now);
126    identifier.setExpirationDate(now + tokenMaxLifetime);
127    identifier.setSequenceNumber(tokenSeq.getAndIncrement());
128    return createPassword(identifier.getBytes(), secretKey.getKey());
129  }
130
131  @Override
132  public byte[] retrievePassword(AuthenticationTokenIdentifier identifier) throws InvalidToken {
133    long now = EnvironmentEdgeManager.currentTime();
134    if (identifier.getExpirationDate() < now) {
135      throw new InvalidToken("Token has expired");
136    }
137    AuthenticationKey masterKey = allKeys.get(identifier.getKeyId());
138    if (masterKey == null) {
139      if (zkWatcher.getWatcher().isAborted()) {
140        LOG.error("ZKWatcher is abort");
141        throw new InvalidToken(
142          "Token keys could not be sync from zookeeper" + " because of ZKWatcher abort");
143      }
144      synchronized (this) {
145        if (!leaderElector.isAlive() || leaderElector.isStopped()) {
146          LOG.warn("Thread leaderElector[" + leaderElector.getName() + ":" + leaderElector.getId()
147            + "] is stopped or not alive");
148          leaderElector.start();
149          LOG.info("Thread leaderElector [" + leaderElector.getName() + ":" + leaderElector.getId()
150            + "] is started");
151        }
152      }
153      zkWatcher.refreshKeys();
154      if (LOG.isDebugEnabled()) {
155        LOG.debug("Sync token keys from zookeeper");
156      }
157      masterKey = allKeys.get(identifier.getKeyId());
158    }
159    if (masterKey == null) {
160      throw new InvalidToken("Unknown master key for token (id=" + identifier.getKeyId() + ")");
161    }
162    // regenerate the password
163    return createPassword(identifier.getBytes(), masterKey.getKey());
164  }
165
166  @Override
167  public AuthenticationTokenIdentifier createIdentifier() {
168    return new AuthenticationTokenIdentifier();
169  }
170
171  public Token<AuthenticationTokenIdentifier> generateToken(String username) {
172    AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier(username);
173    Token<AuthenticationTokenIdentifier> token = new Token<>(ident, this);
174    if (clusterId.hasId()) {
175      token.setService(new Text(clusterId.getId()));
176    }
177    return token;
178  }
179
180  public synchronized void addKey(AuthenticationKey key) throws IOException {
181    // ignore zk changes when running as master
182    if (leaderElector.isMaster()) {
183      LOG.debug("Running as master, ignoring new key {}", key);
184      return;
185    }
186
187    LOG.debug("Adding key {}", key.getKeyId());
188
189    allKeys.put(key.getKeyId(), key);
190    if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) {
191      currentKey = key;
192    }
193    // update current sequence
194    if (key.getKeyId() > idSeq) {
195      idSeq = key.getKeyId();
196    }
197  }
198
199  synchronized boolean removeKey(Integer keyId) {
200    // ignore zk changes when running as master
201    if (leaderElector.isMaster()) {
202      LOG.debug("Running as master, ignoring removed keyid={}", keyId);
203      return false;
204    }
205
206    if (LOG.isDebugEnabled()) {
207      LOG.debug("Removing keyid={}", keyId);
208    }
209
210    allKeys.remove(keyId);
211    return true;
212  }
213
214  synchronized AuthenticationKey getCurrentKey() {
215    return currentKey;
216  }
217
218  AuthenticationKey getKey(int keyId) {
219    return allKeys.get(keyId);
220  }
221
222  synchronized void removeExpiredKeys() {
223    if (!leaderElector.isMaster()) {
224      LOG.info("Skipping removeExpiredKeys() because not running as master.");
225      return;
226    }
227
228    long now = EnvironmentEdgeManager.currentTime();
229    Iterator<AuthenticationKey> iter = allKeys.values().iterator();
230    while (iter.hasNext()) {
231      AuthenticationKey key = iter.next();
232      if (key.getExpiration() < now) {
233        LOG.debug("Removing expired key {}", key);
234        iter.remove();
235        zkWatcher.removeKeyFromZK(key);
236      }
237    }
238  }
239
240  synchronized boolean isCurrentKeyRolled() {
241    return currentKey != null;
242  }
243
244  synchronized void rollCurrentKey() {
245    if (!leaderElector.isMaster()) {
246      LOG.info("Skipping rollCurrentKey() because not running as master.");
247      return;
248    }
249
250    long now = EnvironmentEdgeManager.currentTime();
251    AuthenticationKey prev = currentKey;
252    AuthenticationKey newKey = new AuthenticationKey(++idSeq, Long.MAX_VALUE, // don't allow to
253                                                                              // expire until it's
254                                                                              // replaced by a new
255                                                                              // key
256      generateSecret());
257    allKeys.put(newKey.getKeyId(), newKey);
258    currentKey = newKey;
259    zkWatcher.addKeyToZK(newKey);
260    lastKeyUpdate = now;
261
262    if (prev != null) {
263      // make sure previous key is still stored
264      prev.setExpiration(now + tokenMaxLifetime);
265      allKeys.put(prev.getKeyId(), prev);
266      zkWatcher.updateKeyInZK(prev);
267    }
268  }
269
270  synchronized long getLastKeyUpdate() {
271    return lastKeyUpdate;
272  }
273
274  public static SecretKey createSecretKey(byte[] raw) {
275    return SecretManager.createSecretKey(raw);
276  }
277
278  private class LeaderElector extends Thread implements Stoppable {
279    private boolean stopped = false;
280    /** Flag indicating whether we're in charge of rolling/expiring keys */
281    private boolean isMaster = false;
282    private ZKLeaderManager zkLeader;
283
284    public LeaderElector(ZKWatcher watcher, String serverName) {
285      setDaemon(true);
286      setName("ZKSecretWatcher-leaderElector");
287      zkLeader =
288        new ZKLeaderManager(watcher, ZNodePaths.joinZNode(zkWatcher.getRootKeyZNode(), "keymaster"),
289          Bytes.toBytes(serverName), this);
290    }
291
292    public boolean isMaster() {
293      return isMaster;
294    }
295
296    @Override
297    public boolean isStopped() {
298      return stopped;
299    }
300
301    @Override
302    public void stop(String reason) {
303      if (stopped) {
304        return;
305      }
306
307      stopped = true;
308      // prevent further key generation when stopping
309      if (isMaster) {
310        zkLeader.stepDownAsLeader();
311      }
312      isMaster = false;
313      LOG.info("Stopping leader election, because: " + reason);
314      interrupt();
315    }
316
317    @Override
318    public void run() {
319      zkLeader.start();
320      zkLeader.waitToBecomeLeader();
321      isMaster = true;
322
323      while (!stopped) {
324        long now = EnvironmentEdgeManager.currentTime();
325
326        // clear any expired
327        removeExpiredKeys();
328        long localLastKeyUpdate = getLastKeyUpdate();
329        if (localLastKeyUpdate + keyUpdateInterval < now) {
330          // roll a new master key
331          rollCurrentKey();
332        }
333
334        try {
335          Thread.sleep(5000);
336        } catch (InterruptedException ie) {
337          if (LOG.isDebugEnabled()) {
338            LOG.debug("Interrupted waiting for next update", ie);
339          }
340        }
341      }
342    }
343  }
344}