001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.token;
020
021import javax.crypto.SecretKey;
022import java.io.IOException;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.atomic.AtomicLong;
027
028import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
035import org.apache.hadoop.hbase.zookeeper.ZKLeaderManager;
036import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
037import org.apache.hadoop.io.Text;
038import org.apache.hadoop.security.token.SecretManager;
039import org.apache.hadoop.security.token.Token;
040import org.apache.zookeeper.KeeperException;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Manages an internal list of secret keys used to sign new authentication
046 * tokens as they are generated, and to valid existing tokens used for
047 * authentication.
048 *
049 * <p>
050 * A single instance of {@code AuthenticationTokenSecretManager} will be
051 * running as the "leader" in a given HBase cluster.  The leader is responsible
052 * for periodically generating new secret keys, which are then distributed to
053 * followers via ZooKeeper, and for expiring previously used secret keys that
054 * are no longer needed (as any tokens using them have expired).
055 * </p>
056 */
057@InterfaceAudience.Private
058public class AuthenticationTokenSecretManager
059    extends SecretManager<AuthenticationTokenIdentifier> {
060
061  static final String NAME_PREFIX = "SecretManager-";
062
063  private static final Logger LOG = LoggerFactory.getLogger(
064      AuthenticationTokenSecretManager.class);
065
066  private long lastKeyUpdate;
067  private long keyUpdateInterval;
068  private long tokenMaxLifetime;
069  private ZKSecretWatcher zkWatcher;
070  private LeaderElector leaderElector;
071  private ZKClusterId clusterId;
072
073  private Map<Integer,AuthenticationKey> allKeys = new ConcurrentHashMap<>();
074  private AuthenticationKey currentKey;
075
076  private int idSeq;
077  private AtomicLong tokenSeq = new AtomicLong();
078  private String name;
079
080  /**
081   * Create a new secret manager instance for generating keys.
082   * @param conf Configuration to use
083   * @param zk Connection to zookeeper for handling leader elections
084   * @param keyUpdateInterval Time (in milliseconds) between rolling a new master key for token signing
085   * @param tokenMaxLifetime Maximum age (in milliseconds) before a token expires and is no longer valid
086   */
087  /* TODO: Restrict access to this constructor to make rogues instances more difficult.
088   * For the moment this class is instantiated from
089   * org.apache.hadoop.hbase.ipc.SecureServer so public access is needed.
090   */
091  public AuthenticationTokenSecretManager(Configuration conf,
092                                          ZKWatcher zk, String serverName,
093                                          long keyUpdateInterval, long tokenMaxLifetime) {
094    this.zkWatcher = new ZKSecretWatcher(conf, zk, this);
095    this.keyUpdateInterval = keyUpdateInterval;
096    this.tokenMaxLifetime = tokenMaxLifetime;
097    this.leaderElector = new LeaderElector(zk, serverName);
098    this.name = NAME_PREFIX+serverName;
099    this.clusterId = new ZKClusterId(zk, zk);
100  }
101
102  public void start() {
103    try {
104      // populate any existing keys
105      this.zkWatcher.start();
106      // try to become leader
107      this.leaderElector.start();
108    } catch (KeeperException ke) {
109      LOG.error("ZooKeeper initialization failed", ke);
110    }
111  }
112
113  public void stop() {
114    this.leaderElector.stop("SecretManager stopping");
115  }
116
117  public boolean isMaster() {
118    return leaderElector.isMaster();
119  }
120
121  public String getName() {
122    return name;
123  }
124
125  @Override
126  protected synchronized byte[] createPassword(AuthenticationTokenIdentifier identifier) {
127    long now = EnvironmentEdgeManager.currentTime();
128    AuthenticationKey secretKey = currentKey;
129    identifier.setKeyId(secretKey.getKeyId());
130    identifier.setIssueDate(now);
131    identifier.setExpirationDate(now + tokenMaxLifetime);
132    identifier.setSequenceNumber(tokenSeq.getAndIncrement());
133    return createPassword(identifier.getBytes(),
134        secretKey.getKey());
135  }
136
137  @Override
138  public byte[] retrievePassword(AuthenticationTokenIdentifier identifier)
139      throws InvalidToken {
140    long now = EnvironmentEdgeManager.currentTime();
141    if (identifier.getExpirationDate() < now) {
142      throw new InvalidToken("Token has expired");
143    }
144    AuthenticationKey masterKey = allKeys.get(identifier.getKeyId());
145    if(masterKey == null) {
146      if(zkWatcher.getWatcher().isAborted()) {
147        LOG.error("ZKWatcher is abort");
148        throw new InvalidToken("Token keys could not be sync from zookeeper"
149            + " because of ZKWatcher abort");
150      }
151      synchronized (this) {
152        if (!leaderElector.isAlive() || leaderElector.isStopped()) {
153          LOG.warn("Thread leaderElector[" + leaderElector.getName() + ":"
154              + leaderElector.getId() + "] is stopped or not alive");
155          leaderElector.start();
156          LOG.info("Thread leaderElector [" + leaderElector.getName() + ":"
157              + leaderElector.getId() + "] is started");
158        }
159      }
160      zkWatcher.refreshKeys();
161      if (LOG.isDebugEnabled()) {
162        LOG.debug("Sync token keys from zookeeper");
163      }
164      masterKey = allKeys.get(identifier.getKeyId());
165    }
166    if (masterKey == null) {
167      throw new InvalidToken("Unknown master key for token (id="+
168          identifier.getKeyId()+")");
169    }
170    // regenerate the password
171    return createPassword(identifier.getBytes(),
172        masterKey.getKey());
173  }
174
175  @Override
176  public AuthenticationTokenIdentifier createIdentifier() {
177    return new AuthenticationTokenIdentifier();
178  }
179
180  public Token<AuthenticationTokenIdentifier> generateToken(String username) {
181    AuthenticationTokenIdentifier ident =
182        new AuthenticationTokenIdentifier(username);
183    Token<AuthenticationTokenIdentifier> token = new Token<>(ident, this);
184    if (clusterId.hasId()) {
185      token.setService(new Text(clusterId.getId()));
186    }
187    return token;
188  }
189
190  public synchronized void addKey(AuthenticationKey key) throws IOException {
191    // ignore zk changes when running as master
192    if (leaderElector.isMaster()) {
193      LOG.debug("Running as master, ignoring new key {}", key);
194      return;
195    }
196
197    LOG.debug("Adding key {}", key.getKeyId());
198
199    allKeys.put(key.getKeyId(), key);
200    if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) {
201      currentKey = key;
202    }
203    // update current sequence
204    if (key.getKeyId() > idSeq) {
205      idSeq = key.getKeyId();
206    }
207  }
208
209  synchronized boolean removeKey(Integer keyId) {
210    // ignore zk changes when running as master
211    if (leaderElector.isMaster()) {
212      LOG.debug("Running as master, ignoring removed keyid={}", keyId);
213      return false;
214    }
215
216    if (LOG.isDebugEnabled()) {
217      LOG.debug("Removing keyid={}", keyId);
218    }
219
220    allKeys.remove(keyId);
221    return true;
222  }
223
224  synchronized AuthenticationKey getCurrentKey() {
225    return currentKey;
226  }
227
228  AuthenticationKey getKey(int keyId) {
229    return allKeys.get(keyId);
230  }
231
232  synchronized void removeExpiredKeys() {
233    if (!leaderElector.isMaster()) {
234      LOG.info("Skipping removeExpiredKeys() because not running as master.");
235      return;
236    }
237
238    long now = EnvironmentEdgeManager.currentTime();
239    Iterator<AuthenticationKey> iter = allKeys.values().iterator();
240    while (iter.hasNext()) {
241      AuthenticationKey key = iter.next();
242      if (key.getExpiration() < now) {
243        LOG.debug("Removing expired key {}", key);
244        iter.remove();
245        zkWatcher.removeKeyFromZK(key);
246      }
247    }
248  }
249
250  synchronized boolean isCurrentKeyRolled() {
251    return currentKey != null;
252  }
253
254  synchronized void rollCurrentKey() {
255    if (!leaderElector.isMaster()) {
256      LOG.info("Skipping rollCurrentKey() because not running as master.");
257      return;
258    }
259
260    long now = EnvironmentEdgeManager.currentTime();
261    AuthenticationKey prev = currentKey;
262    AuthenticationKey newKey = new AuthenticationKey(++idSeq,
263        Long.MAX_VALUE, // don't allow to expire until it's replaced by a new key
264        generateSecret());
265    allKeys.put(newKey.getKeyId(), newKey);
266    currentKey = newKey;
267    zkWatcher.addKeyToZK(newKey);
268    lastKeyUpdate = now;
269
270    if (prev != null) {
271      // make sure previous key is still stored
272      prev.setExpiration(now + tokenMaxLifetime);
273      allKeys.put(prev.getKeyId(), prev);
274      zkWatcher.updateKeyInZK(prev);
275    }
276  }
277
278  synchronized long getLastKeyUpdate() {
279    return lastKeyUpdate;
280  }
281
282  public static SecretKey createSecretKey(byte[] raw) {
283    return SecretManager.createSecretKey(raw);
284  }
285
286  private class LeaderElector extends Thread implements Stoppable {
287    private boolean stopped = false;
288    /** Flag indicating whether we're in charge of rolling/expiring keys */
289    private boolean isMaster = false;
290    private ZKLeaderManager zkLeader;
291
292    public LeaderElector(ZKWatcher watcher, String serverName) {
293      setDaemon(true);
294      setName("ZKSecretWatcher-leaderElector");
295      zkLeader = new ZKLeaderManager(watcher,
296          ZNodePaths.joinZNode(zkWatcher.getRootKeyZNode(), "keymaster"),
297          Bytes.toBytes(serverName), this);
298    }
299
300    public boolean isMaster() {
301      return isMaster;
302    }
303
304    @Override
305    public boolean isStopped() {
306      return stopped;
307    }
308
309    @Override
310    public void stop(String reason) {
311      if (stopped) {
312        return;
313      }
314
315      stopped = true;
316      // prevent further key generation when stopping
317      if (isMaster) {
318        zkLeader.stepDownAsLeader();
319      }
320      isMaster = false;
321      LOG.info("Stopping leader election, because: "+reason);
322      interrupt();
323    }
324
325    @Override
326    public void run() {
327      zkLeader.start();
328      zkLeader.waitToBecomeLeader();
329      isMaster = true;
330
331      while (!stopped) {
332        long now = EnvironmentEdgeManager.currentTime();
333
334        // clear any expired
335        removeExpiredKeys();
336        long localLastKeyUpdate = getLastKeyUpdate();
337        if (localLastKeyUpdate + keyUpdateInterval < now) {
338          // roll a new master key
339          rollCurrentKey();
340        }
341
342        try {
343          Thread.sleep(5000);
344        } catch (InterruptedException ie) {
345          if (LOG.isDebugEnabled()) {
346            LOG.debug("Interrupted waiting for next update", ie);
347          }
348        }
349      }
350    }
351  }
352}