1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.token;
20
21 import javax.crypto.SecretKey;
22 import java.io.IOException;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Stoppable;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
35 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
36 import org.apache.hadoop.hbase.zookeeper.ZKLeaderManager;
37 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
38 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.security.token.SecretManager;
41 import org.apache.hadoop.security.token.Token;
42 import org.apache.zookeeper.KeeperException;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 @InterfaceAudience.Private
58 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
59 justification="Complaint is about lastKeyUpdate... afraid to change it.")
60 public class AuthenticationTokenSecretManager
61 extends SecretManager<AuthenticationTokenIdentifier> {
62
63 static final String NAME_PREFIX = "SecretManager-";
64
65 private static final Log LOG = LogFactory.getLog(
66 AuthenticationTokenSecretManager.class);
67
68 private long lastKeyUpdate;
69 private long keyUpdateInterval;
70 private long tokenMaxLifetime;
71 private ZKSecretWatcher zkWatcher;
72 private LeaderElector leaderElector;
73 private ZKClusterId clusterId;
74
75 private Map<Integer,AuthenticationKey> allKeys =
76 new ConcurrentHashMap<Integer, AuthenticationKey>();
77 private AuthenticationKey currentKey;
78
79 private int idSeq;
80 private AtomicLong tokenSeq = new AtomicLong();
81 private String name;
82
83
84
85
86
87
88
89
90
91
92
93
94 public AuthenticationTokenSecretManager(Configuration conf,
95 ZooKeeperWatcher zk, String serverName,
96 long keyUpdateInterval, long tokenMaxLifetime) {
97 this.zkWatcher = new ZKSecretWatcher(conf, zk, this);
98 this.keyUpdateInterval = keyUpdateInterval;
99 this.tokenMaxLifetime = tokenMaxLifetime;
100 this.leaderElector = new LeaderElector(zk, serverName);
101 this.name = NAME_PREFIX+serverName;
102 this.clusterId = new ZKClusterId(zk, zk);
103 }
104
105 public void start() {
106 try {
107
108 this.zkWatcher.start();
109
110 this.leaderElector.start();
111 } catch (KeeperException ke) {
112 LOG.error("Zookeeper initialization failed", ke);
113 }
114 }
115
116 public void stop() {
117 this.leaderElector.stop("SecretManager stopping");
118 }
119
120 public boolean isMaster() {
121 return leaderElector.isMaster();
122 }
123
124 public String getName() {
125 return name;
126 }
127
128 @Override
129 protected synchronized byte[] createPassword(AuthenticationTokenIdentifier identifier) {
130 long now = EnvironmentEdgeManager.currentTime();
131 AuthenticationKey secretKey = currentKey;
132 identifier.setKeyId(secretKey.getKeyId());
133 identifier.setIssueDate(now);
134 identifier.setExpirationDate(now + tokenMaxLifetime);
135 identifier.setSequenceNumber(tokenSeq.getAndIncrement());
136 return createPassword(identifier.getBytes(),
137 secretKey.getKey());
138 }
139
140 @Override
141 public byte[] retrievePassword(AuthenticationTokenIdentifier identifier)
142 throws InvalidToken {
143 long now = EnvironmentEdgeManager.currentTime();
144 if (identifier.getExpirationDate() < now) {
145 throw new InvalidToken("Token has expired");
146 }
147 AuthenticationKey masterKey = allKeys.get(identifier.getKeyId());
148 if(masterKey == null) {
149 if(zkWatcher.getWatcher().isAborted()) {
150 LOG.error("ZookeeperWatcher is abort");
151 throw new InvalidToken("Token keys could not be sync from zookeeper"
152 + " because of ZookeeperWatcher abort");
153 }
154 synchronized (this) {
155 if (!leaderElector.isAlive() || leaderElector.isStopped()) {
156 LOG.warn("Thread leaderElector[" + leaderElector.getName() + ":"
157 + leaderElector.getId() + "] is stoped or not alive");
158 leaderElector.start();
159 LOG.info("Thread leaderElector [" + leaderElector.getName() + ":"
160 + leaderElector.getId() + "] is started");
161 }
162 }
163 zkWatcher.refreshKeys();
164 if (LOG.isDebugEnabled()) {
165 LOG.debug("Sync token keys from zookeeper");
166 }
167 masterKey = allKeys.get(identifier.getKeyId());
168 }
169 if (masterKey == null) {
170 throw new InvalidToken("Unknown master key for token (id="+
171 identifier.getKeyId()+")");
172 }
173
174 return createPassword(identifier.getBytes(),
175 masterKey.getKey());
176 }
177
178 @Override
179 public AuthenticationTokenIdentifier createIdentifier() {
180 return new AuthenticationTokenIdentifier();
181 }
182
183 public Token<AuthenticationTokenIdentifier> generateToken(String username) {
184 AuthenticationTokenIdentifier ident =
185 new AuthenticationTokenIdentifier(username);
186 Token<AuthenticationTokenIdentifier> token =
187 new Token<AuthenticationTokenIdentifier>(ident, this);
188 if (clusterId.hasId()) {
189 token.setService(new Text(clusterId.getId()));
190 }
191 return token;
192 }
193
194 public synchronized void addKey(AuthenticationKey key) throws IOException {
195
196 if (leaderElector.isMaster()) {
197 if (LOG.isDebugEnabled()) {
198 LOG.debug("Running as master, ignoring new key "+key.getKeyId());
199 }
200 return;
201 }
202
203 if (LOG.isDebugEnabled()) {
204 LOG.debug("Adding key "+key.getKeyId());
205 }
206
207 allKeys.put(key.getKeyId(), key);
208 if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) {
209 currentKey = key;
210 }
211
212 if (key.getKeyId() > idSeq) {
213 idSeq = key.getKeyId();
214 }
215 }
216
217 synchronized boolean removeKey(Integer keyId) {
218
219 if (leaderElector.isMaster()) {
220 if (LOG.isDebugEnabled()) {
221 LOG.debug("Running as master, ignoring removed key "+keyId);
222 }
223 return false;
224 }
225
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("Removing key "+keyId);
228 }
229
230 allKeys.remove(keyId);
231 return true;
232 }
233
234 synchronized AuthenticationKey getCurrentKey() {
235 return currentKey;
236 }
237
238 AuthenticationKey getKey(int keyId) {
239 return allKeys.get(keyId);
240 }
241
242 synchronized void removeExpiredKeys() {
243 if (!leaderElector.isMaster()) {
244 LOG.info("Skipping removeExpiredKeys() because not running as master.");
245 return;
246 }
247
248 long now = EnvironmentEdgeManager.currentTime();
249 Iterator<AuthenticationKey> iter = allKeys.values().iterator();
250 while (iter.hasNext()) {
251 AuthenticationKey key = iter.next();
252 if (key.getExpiration() < now) {
253 if (LOG.isDebugEnabled()) {
254 LOG.debug("Removing expired key "+key.getKeyId());
255 }
256 iter.remove();
257 zkWatcher.removeKeyFromZK(key);
258 }
259 }
260 }
261
262 synchronized boolean isCurrentKeyRolled() {
263 return currentKey != null;
264 }
265
266 synchronized void rollCurrentKey() {
267 if (!leaderElector.isMaster()) {
268 LOG.info("Skipping rollCurrentKey() because not running as master.");
269 return;
270 }
271
272 long now = EnvironmentEdgeManager.currentTime();
273 AuthenticationKey prev = currentKey;
274 AuthenticationKey newKey = new AuthenticationKey(++idSeq,
275 Long.MAX_VALUE,
276 generateSecret());
277 allKeys.put(newKey.getKeyId(), newKey);
278 currentKey = newKey;
279 zkWatcher.addKeyToZK(newKey);
280 lastKeyUpdate = now;
281
282 if (prev != null) {
283
284 prev.setExpiration(now + tokenMaxLifetime);
285 allKeys.put(prev.getKeyId(), prev);
286 zkWatcher.updateKeyInZK(prev);
287 }
288 }
289
290 synchronized long getLastKeyUpdate() {
291 return lastKeyUpdate;
292 }
293
294 public static SecretKey createSecretKey(byte[] raw) {
295 return SecretManager.createSecretKey(raw);
296 }
297
298 private class LeaderElector extends Thread implements Stoppable {
299 private boolean stopped = false;
300
301 private boolean isMaster = false;
302 private ZKLeaderManager zkLeader;
303
304 public LeaderElector(ZooKeeperWatcher watcher, String serverName) {
305 setDaemon(true);
306 setName("ZKSecretWatcher-leaderElector");
307 zkLeader = new ZKLeaderManager(watcher,
308 ZKUtil.joinZNode(zkWatcher.getRootKeyZNode(), "keymaster"),
309 Bytes.toBytes(serverName), this);
310 }
311
312 public boolean isMaster() {
313 return isMaster;
314 }
315
316 @Override
317 public boolean isStopped() {
318 return stopped;
319 }
320
321 @Override
322 public void stop(String reason) {
323 if (stopped) {
324 return;
325 }
326
327 stopped = true;
328
329 if (isMaster) {
330 zkLeader.stepDownAsLeader();
331 }
332 isMaster = false;
333 LOG.info("Stopping leader election, because: "+reason);
334 interrupt();
335 }
336
337 public void run() {
338 zkLeader.start();
339 zkLeader.waitToBecomeLeader();
340 isMaster = true;
341
342 while (!stopped) {
343 long now = EnvironmentEdgeManager.currentTime();
344
345
346 removeExpiredKeys();
347 long localLastKeyUpdate = getLastKeyUpdate();
348 if (localLastKeyUpdate + keyUpdateInterval < now) {
349
350 rollCurrentKey();
351 }
352
353 try {
354 Thread.sleep(5000);
355 } catch (InterruptedException ie) {
356 if (LOG.isDebugEnabled()) {
357 LOG.debug("Interrupted waiting for next update", ie);
358 }
359 }
360 }
361 }
362 }
363 }