1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.classification.InterfaceAudience;
24 import org.apache.hadoop.classification.InterfaceStability;
25 import org.apache.hadoop.hbase.util.Bytes;
26 import org.apache.hadoop.hbase.util.RetryCounter;
27 import org.apache.hadoop.hbase.util.RetryCounterFactory;
28 import org.apache.zookeeper.AsyncCallback;
29 import org.apache.zookeeper.CreateMode;
30 import org.apache.zookeeper.KeeperException;
31 import org.apache.zookeeper.Op;
32 import org.apache.zookeeper.OpResult;
33 import org.apache.zookeeper.Watcher;
34 import org.apache.zookeeper.ZooDefs;
35 import org.apache.zookeeper.ZooKeeper;
36 import org.apache.zookeeper.ZooKeeper.States;
37 import org.apache.zookeeper.data.ACL;
38 import org.apache.zookeeper.data.Stat;
39 import org.apache.zookeeper.proto.CreateRequest;
40 import org.apache.zookeeper.proto.SetDataRequest;
41
42 import java.io.IOException;
43 import java.lang.management.ManagementFactory;
44 import java.security.SecureRandom;
45 import java.util.ArrayList;
46 import java.util.LinkedList;
47 import java.util.List;
48 import java.util.Random;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 @InterfaceAudience.Public
74 @InterfaceStability.Evolving
75 public class RecoverableZooKeeper {
76 private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
77
78 volatile private ZooKeeper zk;
79 private final RetryCounterFactory retryCounterFactory;
80
81 private final String identifier;
82 private final byte[] id;
83 private Watcher watcher;
84 private int sessionTimeout;
85 private String quorumServers;
86 private final Random salter;
87
88
89
90
91
92
93
94
95
96 private static final byte MAGIC =(byte) 0XFF;
97 private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
98 private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
99 private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
100
101 public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
102 Watcher watcher, int maxRetries, int retryIntervalMillis)
103 throws IOException {
104 this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
105 null);
106 }
107
108 public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
109 Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
110 throws IOException {
111 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
112 this.retryCounterFactory =
113 new RetryCounterFactory(maxRetries, retryIntervalMillis);
114
115 if (identifier == null || identifier.length() == 0) {
116
117 identifier = ManagementFactory.getRuntimeMXBean().getName();
118 }
119 LOG.info("Process identifier=" + identifier +
120 " connecting to ZooKeeper ensemble=" + quorumServers);
121 this.identifier = identifier;
122 this.id = Bytes.toBytes(identifier);
123
124 this.watcher = watcher;
125 this.sessionTimeout = sessionTimeout;
126 this.quorumServers = quorumServers;
127 salter = new SecureRandom();
128 }
129
130 public void reconnectAfterExpiration()
131 throws IOException, InterruptedException {
132 LOG.info("Closing dead ZooKeeper connection, session" +
133 " was: 0x"+Long.toHexString(zk.getSessionId()));
134 zk.close();
135 this.zk = new ZooKeeper(this.quorumServers,
136 this.sessionTimeout, this.watcher);
137 LOG.info("Recreated a ZooKeeper, session" +
138 " is: 0x"+Long.toHexString(zk.getSessionId()));
139 }
140
141
142
143
144
145
146 public void delete(String path, int version)
147 throws InterruptedException, KeeperException {
148 RetryCounter retryCounter = retryCounterFactory.create();
149 boolean isRetry = false;
150 while (true) {
151 try {
152 zk.delete(path, version);
153 return;
154 } catch (KeeperException e) {
155 switch (e.code()) {
156 case NONODE:
157 if (isRetry) {
158 LOG.info("Node " + path + " already deleted. Assuming a " +
159 "previous attempt succeeded.");
160 return;
161 }
162 LOG.warn("Node " + path + " already deleted, retry=" + isRetry);
163 throw e;
164
165 case CONNECTIONLOSS:
166 case SESSIONEXPIRED:
167 case OPERATIONTIMEOUT:
168 retryOrThrow(retryCounter, e, "delete");
169 break;
170
171 default:
172 throw e;
173 }
174 }
175 retryCounter.sleepUntilNextRetry();
176 retryCounter.useRetry();
177 isRetry = true;
178 }
179 }
180
181
182
183
184
185 public Stat exists(String path, Watcher watcher)
186 throws KeeperException, InterruptedException {
187 RetryCounter retryCounter = retryCounterFactory.create();
188 while (true) {
189 try {
190 return zk.exists(path, watcher);
191 } catch (KeeperException e) {
192 switch (e.code()) {
193 case CONNECTIONLOSS:
194 case SESSIONEXPIRED:
195 case OPERATIONTIMEOUT:
196 retryOrThrow(retryCounter, e, "exists");
197 break;
198
199 default:
200 throw e;
201 }
202 }
203 retryCounter.sleepUntilNextRetry();
204 retryCounter.useRetry();
205 }
206 }
207
208
209
210
211
212 public Stat exists(String path, boolean watch)
213 throws KeeperException, InterruptedException {
214 RetryCounter retryCounter = retryCounterFactory.create();
215 while (true) {
216 try {
217 return zk.exists(path, watch);
218 } catch (KeeperException e) {
219 switch (e.code()) {
220 case CONNECTIONLOSS:
221 case SESSIONEXPIRED:
222 case OPERATIONTIMEOUT:
223 retryOrThrow(retryCounter, e, "exists");
224 break;
225
226 default:
227 throw e;
228 }
229 }
230 retryCounter.sleepUntilNextRetry();
231 retryCounter.useRetry();
232 }
233 }
234
235 private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
236 String opName) throws KeeperException {
237 LOG.warn("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
238 if (!retryCounter.shouldRetry()) {
239 LOG.error("ZooKeeper " + opName + " failed after "
240 + retryCounter.getMaxRetries() + " retries");
241 throw e;
242 }
243 }
244
245
246
247
248
249 public List<String> getChildren(String path, Watcher watcher)
250 throws KeeperException, InterruptedException {
251 RetryCounter retryCounter = retryCounterFactory.create();
252 while (true) {
253 try {
254 return zk.getChildren(path, watcher);
255 } catch (KeeperException e) {
256 switch (e.code()) {
257 case CONNECTIONLOSS:
258 case SESSIONEXPIRED:
259 case OPERATIONTIMEOUT:
260 retryOrThrow(retryCounter, e, "getChildren");
261 break;
262
263 default:
264 throw e;
265 }
266 }
267 retryCounter.sleepUntilNextRetry();
268 retryCounter.useRetry();
269 }
270 }
271
272
273
274
275
276 public List<String> getChildren(String path, boolean watch)
277 throws KeeperException, InterruptedException {
278 RetryCounter retryCounter = retryCounterFactory.create();
279 while (true) {
280 try {
281 return zk.getChildren(path, watch);
282 } catch (KeeperException e) {
283 switch (e.code()) {
284 case CONNECTIONLOSS:
285 case SESSIONEXPIRED:
286 case OPERATIONTIMEOUT:
287 retryOrThrow(retryCounter, e, "getChildren");
288 break;
289
290 default:
291 throw e;
292 }
293 }
294 retryCounter.sleepUntilNextRetry();
295 retryCounter.useRetry();
296 }
297 }
298
299
300
301
302
303 public byte[] getData(String path, Watcher watcher, Stat stat)
304 throws KeeperException, InterruptedException {
305 RetryCounter retryCounter = retryCounterFactory.create();
306 while (true) {
307 try {
308 byte[] revData = zk.getData(path, watcher, stat);
309 return this.removeMetaData(revData);
310 } catch (KeeperException e) {
311 switch (e.code()) {
312 case CONNECTIONLOSS:
313 case SESSIONEXPIRED:
314 case OPERATIONTIMEOUT:
315 retryOrThrow(retryCounter, e, "getData");
316 break;
317
318 default:
319 throw e;
320 }
321 }
322 retryCounter.sleepUntilNextRetry();
323 retryCounter.useRetry();
324 }
325 }
326
327
328
329
330
331 public byte[] getData(String path, boolean watch, Stat stat)
332 throws KeeperException, InterruptedException {
333 RetryCounter retryCounter = retryCounterFactory.create();
334 while (true) {
335 try {
336 byte[] revData = zk.getData(path, watch, stat);
337 return this.removeMetaData(revData);
338 } catch (KeeperException e) {
339 switch (e.code()) {
340 case CONNECTIONLOSS:
341 case SESSIONEXPIRED:
342 case OPERATIONTIMEOUT:
343 retryOrThrow(retryCounter, e, "getData");
344 break;
345
346 default:
347 throw e;
348 }
349 }
350 retryCounter.sleepUntilNextRetry();
351 retryCounter.useRetry();
352 }
353 }
354
355
356
357
358
359
360
361 public Stat setData(String path, byte[] data, int version)
362 throws KeeperException, InterruptedException {
363 RetryCounter retryCounter = retryCounterFactory.create();
364 byte[] newData = appendMetaData(data);
365 boolean isRetry = false;
366 while (true) {
367 try {
368 return zk.setData(path, newData, version);
369 } catch (KeeperException e) {
370 switch (e.code()) {
371 case CONNECTIONLOSS:
372 case SESSIONEXPIRED:
373 case OPERATIONTIMEOUT:
374 retryOrThrow(retryCounter, e, "setData");
375 break;
376 case BADVERSION:
377 if (isRetry) {
378
379 try{
380 Stat stat = new Stat();
381 byte[] revData = zk.getData(path, false, stat);
382 if(Bytes.compareTo(revData, newData) == 0) {
383
384 return stat;
385 }
386 } catch(KeeperException keeperException){
387
388 throw keeperException;
389 }
390 }
391
392 default:
393 throw e;
394 }
395 }
396 retryCounter.sleepUntilNextRetry();
397 retryCounter.useRetry();
398 isRetry = true;
399 }
400 }
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417 public String create(String path, byte[] data, List<ACL> acl,
418 CreateMode createMode)
419 throws KeeperException, InterruptedException {
420 byte[] newData = appendMetaData(data);
421 switch (createMode) {
422 case EPHEMERAL:
423 case PERSISTENT:
424 return createNonSequential(path, newData, acl, createMode);
425
426 case EPHEMERAL_SEQUENTIAL:
427 case PERSISTENT_SEQUENTIAL:
428 return createSequential(path, newData, acl, createMode);
429
430 default:
431 throw new IllegalArgumentException("Unrecognized CreateMode: " +
432 createMode);
433 }
434 }
435
436 private String createNonSequential(String path, byte[] data, List<ACL> acl,
437 CreateMode createMode) throws KeeperException, InterruptedException {
438 RetryCounter retryCounter = retryCounterFactory.create();
439 boolean isRetry = false;
440 while (true) {
441 try {
442 return zk.create(path, data, acl, createMode);
443 } catch (KeeperException e) {
444 switch (e.code()) {
445 case NODEEXISTS:
446 if (isRetry) {
447
448
449
450 byte[] currentData = zk.getData(path, false, null);
451 if (currentData != null &&
452 Bytes.compareTo(currentData, data) == 0) {
453
454 return path;
455 }
456 LOG.error("Node " + path + " already exists with " +
457 Bytes.toStringBinary(currentData) + ", could not write " +
458 Bytes.toStringBinary(data));
459 throw e;
460 }
461 LOG.info("Node " + path + " already exists and this is not a " +
462 "retry");
463 throw e;
464
465 case CONNECTIONLOSS:
466 case SESSIONEXPIRED:
467 case OPERATIONTIMEOUT:
468 retryOrThrow(retryCounter, e, "create");
469 break;
470
471 default:
472 throw e;
473 }
474 }
475 retryCounter.sleepUntilNextRetry();
476 retryCounter.useRetry();
477 isRetry = true;
478 }
479 }
480
481 private String createSequential(String path, byte[] data,
482 List<ACL> acl, CreateMode createMode)
483 throws KeeperException, InterruptedException {
484 RetryCounter retryCounter = retryCounterFactory.create();
485 boolean first = true;
486 String newPath = path+this.identifier;
487 while (true) {
488 try {
489 if (!first) {
490
491 String previousResult = findPreviousSequentialNode(newPath);
492 if (previousResult != null) {
493 return previousResult;
494 }
495 }
496 first = false;
497 return zk.create(newPath, data, acl, createMode);
498 } catch (KeeperException e) {
499 switch (e.code()) {
500 case CONNECTIONLOSS:
501 case SESSIONEXPIRED:
502 case OPERATIONTIMEOUT:
503 retryOrThrow(retryCounter, e, "create");
504 break;
505
506 default:
507 throw e;
508 }
509 }
510 retryCounter.sleepUntilNextRetry();
511 retryCounter.useRetry();
512 }
513 }
514
515
516
517
518 private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
519 throws UnsupportedOperationException {
520 if(ops == null) return null;
521
522 List<Op> preparedOps = new LinkedList<Op>();
523 for (Op op : ops) {
524 if (op.getType() == ZooDefs.OpCode.create) {
525 CreateRequest create = (CreateRequest)op.toRequestRecord();
526 preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
527 create.getAcl(), create.getFlags()));
528 } else if (op.getType() == ZooDefs.OpCode.delete) {
529
530 preparedOps.add(op);
531 } else if (op.getType() == ZooDefs.OpCode.setData) {
532 SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
533 preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
534 setData.getVersion()));
535 } else {
536 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
537 }
538 }
539 return preparedOps;
540 }
541
542
543
544
545 public List<OpResult> multi(Iterable<Op> ops)
546 throws KeeperException, InterruptedException {
547 RetryCounter retryCounter = retryCounterFactory.create();
548 Iterable<Op> multiOps = prepareZKMulti(ops);
549 while (true) {
550 try {
551 return zk.multi(multiOps);
552 } catch (KeeperException e) {
553 switch (e.code()) {
554 case CONNECTIONLOSS:
555 case SESSIONEXPIRED:
556 case OPERATIONTIMEOUT:
557 retryOrThrow(retryCounter, e, "multi");
558 break;
559
560 default:
561 throw e;
562 }
563 }
564 retryCounter.sleepUntilNextRetry();
565 retryCounter.useRetry();
566 }
567 }
568
569 private String findPreviousSequentialNode(String path)
570 throws KeeperException, InterruptedException {
571 int lastSlashIdx = path.lastIndexOf('/');
572 assert(lastSlashIdx != -1);
573 String parent = path.substring(0, lastSlashIdx);
574 String nodePrefix = path.substring(lastSlashIdx+1);
575
576 List<String> nodes = zk.getChildren(parent, false);
577 List<String> matching = filterByPrefix(nodes, nodePrefix);
578 for (String node : matching) {
579 String nodePath = parent + "/" + node;
580 Stat stat = zk.exists(nodePath, false);
581 if (stat != null) {
582 return nodePath;
583 }
584 }
585 return null;
586 }
587
588 public byte[] removeMetaData(byte[] data) {
589 if(data == null || data.length == 0) {
590 return data;
591 }
592
593 byte magic = data[0];
594 if(magic != MAGIC) {
595 return data;
596 }
597
598 int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
599 int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
600 int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
601
602 byte[] newData = new byte[dataLength];
603 System.arraycopy(data, dataOffset, newData, 0, dataLength);
604 return newData;
605 }
606
607 private byte[] appendMetaData(byte[] data) {
608 if(data == null || data.length == 0){
609 return data;
610 }
611 byte[] salt = Bytes.toBytes(salter.nextLong());
612 int idLength = id.length + salt.length;
613 byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
614 int pos = 0;
615 pos = Bytes.putByte(newData, pos, MAGIC);
616 pos = Bytes.putInt(newData, pos, idLength);
617 pos = Bytes.putBytes(newData, pos, id, 0, id.length);
618 pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
619 pos = Bytes.putBytes(newData, pos, data, 0, data.length);
620 return newData;
621 }
622
623 public long getSessionId() {
624 return zk.getSessionId();
625 }
626
627 public void close() throws InterruptedException {
628 zk.close();
629 }
630
631 public States getState() {
632 return zk.getState();
633 }
634
635 public ZooKeeper getZooKeeper() {
636 return zk;
637 }
638
639 public byte[] getSessionPasswd() {
640 return zk.getSessionPasswd();
641 }
642
643 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
644 this.zk.sync(path, null, null);
645 }
646
647
648
649
650
651
652
653
654
655
656 private static List<String> filterByPrefix(List<String> nodes,
657 String... prefixes) {
658 List<String> lockChildren = new ArrayList<String>();
659 for (String child : nodes){
660 for (String prefix : prefixes){
661 if (child.startsWith(prefix)){
662 lockChildren.add(child);
663 break;
664 }
665 }
666 }
667 return lockChildren;
668 }
669
670 public String getIdentifier() {
671 return identifier;
672 }
673 }