001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.security.token;
020
021import javax.crypto.SecretKey;
022import java.io.IOException;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.atomic.AtomicLong;
027
028import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.util.Bytes;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
035import org.apache.hadoop.hbase.zookeeper.ZKLeaderManager;
036import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
037import org.apache.hadoop.io.Text;
038import org.apache.hadoop.security.token.SecretManager;
039import org.apache.hadoop.security.token.Token;
040import org.apache.zookeeper.KeeperException;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Manages an internal list of secret keys used to sign new authentication
046 * tokens as they are generated, and to valid existing tokens used for
047 * authentication.
048 *
049 * <p>
050 * A single instance of {@code AuthenticationTokenSecretManager} will be
051 * running as the "leader" in a given HBase cluster.  The leader is responsible
052 * for periodically generating new secret keys, which are then distributed to
053 * followers via ZooKeeper, and for expiring previously used secret keys that
054 * are no longer needed (as any tokens using them have expired).
055 * </p>
056 */
057@InterfaceAudience.Private
058public class AuthenticationTokenSecretManager
059    extends SecretManager<AuthenticationTokenIdentifier> {
060
061  static final String NAME_PREFIX = "SecretManager-";
062
063  private static final Logger LOG = LoggerFactory.getLogger(
064      AuthenticationTokenSecretManager.class);
065
066  private long lastKeyUpdate;
067  private long keyUpdateInterval;
068  private long tokenMaxLifetime;
069  private ZKSecretWatcher zkWatcher;
070  private LeaderElector leaderElector;
071  private ZKClusterId clusterId;
072
073  private Map<Integer,AuthenticationKey> allKeys = new ConcurrentHashMap<>();
074  private AuthenticationKey currentKey;
075
076  private int idSeq;
077  private AtomicLong tokenSeq = new AtomicLong();
078  private String name;
079
080  /**
081   * Create a new secret manager instance for generating keys.
082   * @param conf Configuration to use
083   * @param zk Connection to zookeeper for handling leader elections
084   * @param keyUpdateInterval Time (in milliseconds) between rolling a new master key for token signing
085   * @param tokenMaxLifetime Maximum age (in milliseconds) before a token expires and is no longer valid
086   */
087  /* TODO: Restrict access to this constructor to make rogues instances more difficult.
088   * For the moment this class is instantiated from
089   * org.apache.hadoop.hbase.ipc.SecureServer so public access is needed.
090   */
091  public AuthenticationTokenSecretManager(Configuration conf,
092                                          ZKWatcher zk, String serverName,
093                                          long keyUpdateInterval, long tokenMaxLifetime) {
094    this.zkWatcher = new ZKSecretWatcher(conf, zk, this);
095    this.keyUpdateInterval = keyUpdateInterval;
096    this.tokenMaxLifetime = tokenMaxLifetime;
097    this.leaderElector = new LeaderElector(zk, serverName);
098    this.name = NAME_PREFIX+serverName;
099    this.clusterId = new ZKClusterId(zk, zk);
100  }
101
102  public void start() {
103    try {
104      // populate any existing keys
105      this.zkWatcher.start();
106      // try to become leader
107      this.leaderElector.start();
108    } catch (KeeperException ke) {
109      LOG.error("ZooKeeper initialization failed", ke);
110    }
111  }
112
113  public void stop() {
114    this.leaderElector.stop("SecretManager stopping");
115  }
116
117  public boolean isMaster() {
118    return leaderElector.isMaster();
119  }
120
121  public String getName() {
122    return name;
123  }
124
125  @Override
126  protected synchronized byte[] createPassword(AuthenticationTokenIdentifier identifier) {
127    long now = EnvironmentEdgeManager.currentTime();
128    AuthenticationKey secretKey = currentKey;
129    identifier.setKeyId(secretKey.getKeyId());
130    identifier.setIssueDate(now);
131    identifier.setExpirationDate(now + tokenMaxLifetime);
132    identifier.setSequenceNumber(tokenSeq.getAndIncrement());
133    return createPassword(identifier.getBytes(),
134        secretKey.getKey());
135  }
136
137  @Override
138  public byte[] retrievePassword(AuthenticationTokenIdentifier identifier)
139      throws InvalidToken {
140    long now = EnvironmentEdgeManager.currentTime();
141    if (identifier.getExpirationDate() < now) {
142      throw new InvalidToken("Token has expired");
143    }
144    AuthenticationKey masterKey = allKeys.get(identifier.getKeyId());
145    if(masterKey == null) {
146      if(zkWatcher.getWatcher().isAborted()) {
147        LOG.error("ZKWatcher is abort");
148        throw new InvalidToken("Token keys could not be sync from zookeeper"
149            + " because of ZKWatcher abort");
150      }
151      synchronized (this) {
152        if (!leaderElector.isAlive() || leaderElector.isStopped()) {
153          LOG.warn("Thread leaderElector[" + leaderElector.getName() + ":"
154              + leaderElector.getId() + "] is stopped or not alive");
155          leaderElector.start();
156          LOG.info("Thread leaderElector [" + leaderElector.getName() + ":"
157              + leaderElector.getId() + "] is started");
158        }
159      }
160      zkWatcher.refreshKeys();
161      if (LOG.isDebugEnabled()) {
162        LOG.debug("Sync token keys from zookeeper");
163      }
164      masterKey = allKeys.get(identifier.getKeyId());
165    }
166    if (masterKey == null) {
167      throw new InvalidToken("Unknown master key for token (id="+
168          identifier.getKeyId()+")");
169    }
170    // regenerate the password
171    return createPassword(identifier.getBytes(),
172        masterKey.getKey());
173  }
174
175  @Override
176  public AuthenticationTokenIdentifier createIdentifier() {
177    return new AuthenticationTokenIdentifier();
178  }
179
180  public Token<AuthenticationTokenIdentifier> generateToken(String username) {
181    AuthenticationTokenIdentifier ident =
182        new AuthenticationTokenIdentifier(username);
183    Token<AuthenticationTokenIdentifier> token = new Token<>(ident, this);
184    if (clusterId.hasId()) {
185      token.setService(new Text(clusterId.getId()));
186    }
187    return token;
188  }
189
190  public synchronized void addKey(AuthenticationKey key) throws IOException {
191    // ignore zk changes when running as master
192    if (leaderElector.isMaster()) {
193      if (LOG.isDebugEnabled()) {
194        LOG.debug("Running as master, ignoring new key "+key.getKeyId());
195      }
196      return;
197    }
198
199    if (LOG.isDebugEnabled()) {
200      LOG.debug("Adding key "+key.getKeyId());
201    }
202
203    allKeys.put(key.getKeyId(), key);
204    if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) {
205      currentKey = key;
206    }
207    // update current sequence
208    if (key.getKeyId() > idSeq) {
209      idSeq = key.getKeyId();
210    }
211  }
212
213  synchronized boolean removeKey(Integer keyId) {
214    // ignore zk changes when running as master
215    if (leaderElector.isMaster()) {
216      if (LOG.isDebugEnabled()) {
217        LOG.debug("Running as master, ignoring removed key "+keyId);
218      }
219      return false;
220    }
221
222    if (LOG.isDebugEnabled()) {
223      LOG.debug("Removing key "+keyId);
224    }
225
226    allKeys.remove(keyId);
227    return true;
228  }
229
230  synchronized AuthenticationKey getCurrentKey() {
231    return currentKey;
232  }
233
234  AuthenticationKey getKey(int keyId) {
235    return allKeys.get(keyId);
236  }
237
238  synchronized void removeExpiredKeys() {
239    if (!leaderElector.isMaster()) {
240      LOG.info("Skipping removeExpiredKeys() because not running as master.");
241      return;
242    }
243
244    long now = EnvironmentEdgeManager.currentTime();
245    Iterator<AuthenticationKey> iter = allKeys.values().iterator();
246    while (iter.hasNext()) {
247      AuthenticationKey key = iter.next();
248      if (key.getExpiration() < now) {
249        if (LOG.isDebugEnabled()) {
250          LOG.debug("Removing expired key "+key.getKeyId());
251        }
252        iter.remove();
253        zkWatcher.removeKeyFromZK(key);
254      }
255    }
256  }
257
258  synchronized boolean isCurrentKeyRolled() {
259    return currentKey != null;
260  }
261
262  synchronized void rollCurrentKey() {
263    if (!leaderElector.isMaster()) {
264      LOG.info("Skipping rollCurrentKey() because not running as master.");
265      return;
266    }
267
268    long now = EnvironmentEdgeManager.currentTime();
269    AuthenticationKey prev = currentKey;
270    AuthenticationKey newKey = new AuthenticationKey(++idSeq,
271        Long.MAX_VALUE, // don't allow to expire until it's replaced by a new key
272        generateSecret());
273    allKeys.put(newKey.getKeyId(), newKey);
274    currentKey = newKey;
275    zkWatcher.addKeyToZK(newKey);
276    lastKeyUpdate = now;
277
278    if (prev != null) {
279      // make sure previous key is still stored
280      prev.setExpiration(now + tokenMaxLifetime);
281      allKeys.put(prev.getKeyId(), prev);
282      zkWatcher.updateKeyInZK(prev);
283    }
284  }
285
286  synchronized long getLastKeyUpdate() {
287    return lastKeyUpdate;
288  }
289
290  public static SecretKey createSecretKey(byte[] raw) {
291    return SecretManager.createSecretKey(raw);
292  }
293
294  private class LeaderElector extends Thread implements Stoppable {
295    private boolean stopped = false;
296    /** Flag indicating whether we're in charge of rolling/expiring keys */
297    private boolean isMaster = false;
298    private ZKLeaderManager zkLeader;
299
300    public LeaderElector(ZKWatcher watcher, String serverName) {
301      setDaemon(true);
302      setName("ZKSecretWatcher-leaderElector");
303      zkLeader = new ZKLeaderManager(watcher,
304          ZNodePaths.joinZNode(zkWatcher.getRootKeyZNode(), "keymaster"),
305          Bytes.toBytes(serverName), this);
306    }
307
308    public boolean isMaster() {
309      return isMaster;
310    }
311
312    @Override
313    public boolean isStopped() {
314      return stopped;
315    }
316
317    @Override
318    public void stop(String reason) {
319      if (stopped) {
320        return;
321      }
322
323      stopped = true;
324      // prevent further key generation when stopping
325      if (isMaster) {
326        zkLeader.stepDownAsLeader();
327      }
328      isMaster = false;
329      LOG.info("Stopping leader election, because: "+reason);
330      interrupt();
331    }
332
333    @Override
334    public void run() {
335      zkLeader.start();
336      zkLeader.waitToBecomeLeader();
337      isMaster = true;
338
339      while (!stopped) {
340        long now = EnvironmentEdgeManager.currentTime();
341
342        // clear any expired
343        removeExpiredKeys();
344        long localLastKeyUpdate = getLastKeyUpdate();
345        if (localLastKeyUpdate + keyUpdateInterval < now) {
346          // roll a new master key
347          rollCurrentKey();
348        }
349
350        try {
351          Thread.sleep(5000);
352        } catch (InterruptedException ie) {
353          if (LOG.isDebugEnabled()) {
354            LOG.debug("Interrupted waiting for next update", ie);
355          }
356        }
357      }
358    }
359  }
360}