1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.IOException;
22 import java.lang.management.ManagementFactory;
23 import java.util.ArrayList;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Random;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.hbase.util.RetryCounter;
33 import org.apache.hadoop.hbase.util.RetryCounterFactory;
34 import org.apache.zookeeper.AsyncCallback;
35 import org.apache.zookeeper.CreateMode;
36 import org.apache.zookeeper.KeeperException;
37 import org.apache.zookeeper.Op;
38 import org.apache.zookeeper.OpResult;
39 import org.apache.zookeeper.Watcher;
40 import org.apache.zookeeper.ZooDefs;
41 import org.apache.zookeeper.ZooKeeper;
42 import org.apache.zookeeper.ZooKeeper.States;
43 import org.apache.zookeeper.data.ACL;
44 import org.apache.zookeeper.data.Stat;
45 import org.apache.zookeeper.proto.CreateRequest;
46 import org.apache.zookeeper.proto.SetDataRequest;
47 import org.apache.htrace.Trace;
48 import org.apache.htrace.TraceScope;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 @InterfaceAudience.Private
74 public class RecoverableZooKeeper {
75 private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
76
77 private ZooKeeper zk;
78 private final RetryCounterFactory retryCounterFactory;
79
80 private final String identifier;
81 private final byte[] id;
82 private Watcher watcher;
83 private int sessionTimeout;
84 private String quorumServers;
85 private final Random salter;
86
87
88
89
90
91
92
93
94
95 private static final byte MAGIC =(byte) 0XFF;
96 private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
97 private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
98 private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
99
100 public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
101 Watcher watcher, int maxRetries, int retryIntervalMillis)
102 throws IOException {
103 this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
104 null);
105 }
106
107 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
108 justification="None. Its always been this way.")
109 public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
110 Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
111 throws IOException {
112
113 this.retryCounterFactory =
114 new RetryCounterFactory(maxRetries+1, retryIntervalMillis);
115
116 if (identifier == null || identifier.length() == 0) {
117
118 identifier = ManagementFactory.getRuntimeMXBean().getName();
119 }
120 LOG.info("Process identifier=" + identifier +
121 " connecting to ZooKeeper ensemble=" + quorumServers);
122 this.identifier = identifier;
123 this.id = Bytes.toBytes(identifier);
124
125 this.watcher = watcher;
126 this.sessionTimeout = sessionTimeout;
127 this.quorumServers = quorumServers;
128 try {checkZk();} catch (Exception x) {
129 salter = new Random();
130 }
131
132
133
134
135
136
137
138 protected synchronized ZooKeeper checkZk() throws KeeperException {
139 if (this.zk == null) {
140 try {
141 this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
142 } catch (IOException ex) {
143 LOG.warn("Unable to create ZooKeeper Connection", ex);
144 throw new KeeperException.OperationTimeoutException();
145 }
146 }
147 return zk;
148 }
149
150 public synchronized void reconnectAfterExpiration()
151 throws IOException, KeeperException, InterruptedException {
152 if (zk != null) {
153 LOG.info("Closing dead ZooKeeper connection, session" +
154 " was: 0x"+Long.toHexString(zk.getSessionId()));
155 zk.close();
156
157 zk = null;
158 }
159 checkZk();
160 LOG.info("Recreated a ZooKeeper, session" +
161 " is: 0x"+Long.toHexString(zk.getSessionId()));
162 }
163
164
165
166
167
168
169 public void delete(String path, int version)
170 throws InterruptedException, KeeperException {
171 TraceScope traceScope = null;
172 try {
173 traceScope = Trace.startSpan("RecoverableZookeeper.delete");
174 RetryCounter retryCounter = retryCounterFactory.create();
175 boolean isRetry = false;
176 while (true) {
177 try {
178 checkZk().delete(path, version);
179 return;
180 } catch (KeeperException e) {
181 switch (e.code()) {
182 case NONODE:
183 if (isRetry) {
184 LOG.debug("Node " + path + " already deleted. Assuming a " +
185 "previous attempt succeeded.");
186 return;
187 }
188 LOG.debug("Node " + path + " already deleted, retry=" + isRetry);
189 throw e;
190
191 case CONNECTIONLOSS:
192 case OPERATIONTIMEOUT:
193 retryOrThrow(retryCounter, e, "delete");
194 break;
195
196 default:
197 throw e;
198 }
199 }
200 retryCounter.sleepUntilNextRetry();
201 isRetry = true;
202 }
203 } finally {
204 if (traceScope != null) traceScope.close();
205 }
206 }
207
208
209
210
211
212 public Stat exists(String path, Watcher watcher)
213 throws KeeperException, InterruptedException {
214 TraceScope traceScope = null;
215 try {
216 traceScope = Trace.startSpan("RecoverableZookeeper.exists");
217 RetryCounter retryCounter = retryCounterFactory.create();
218 while (true) {
219 try {
220 return checkZk().exists(path, watcher);
221 } catch (KeeperException e) {
222 switch (e.code()) {
223 case CONNECTIONLOSS:
224 case OPERATIONTIMEOUT:
225 retryOrThrow(retryCounter, e, "exists");
226 break;
227
228 default:
229 throw e;
230 }
231 }
232 retryCounter.sleepUntilNextRetry();
233 }
234 } finally {
235 if (traceScope != null) traceScope.close();
236 }
237 }
238
239
240
241
242
243 public Stat exists(String path, boolean watch)
244 throws KeeperException, InterruptedException {
245 TraceScope traceScope = null;
246 try {
247 traceScope = Trace.startSpan("RecoverableZookeeper.exists");
248 RetryCounter retryCounter = retryCounterFactory.create();
249 while (true) {
250 try {
251 return checkZk().exists(path, watch);
252 } catch (KeeperException e) {
253 switch (e.code()) {
254 case CONNECTIONLOSS:
255 case OPERATIONTIMEOUT:
256 retryOrThrow(retryCounter, e, "exists");
257 break;
258
259 default:
260 throw e;
261 }
262 }
263 retryCounter.sleepUntilNextRetry();
264 }
265 } finally {
266 if (traceScope != null) traceScope.close();
267 }
268 }
269
270 private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
271 String opName) throws KeeperException {
272 LOG.debug("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
273 if (!retryCounter.shouldRetry()) {
274 LOG.error("ZooKeeper " + opName + " failed after "
275 + retryCounter.getMaxAttempts() + " attempts");
276 throw e;
277 }
278 }
279
280
281
282
283
284 public List<String> getChildren(String path, Watcher watcher)
285 throws KeeperException, InterruptedException {
286 TraceScope traceScope = null;
287 try {
288 traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
289 RetryCounter retryCounter = retryCounterFactory.create();
290 while (true) {
291 try {
292 return checkZk().getChildren(path, watcher);
293 } catch (KeeperException e) {
294 switch (e.code()) {
295 case CONNECTIONLOSS:
296 case OPERATIONTIMEOUT:
297 retryOrThrow(retryCounter, e, "getChildren");
298 break;
299
300 default:
301 throw e;
302 }
303 }
304 retryCounter.sleepUntilNextRetry();
305 }
306 } finally {
307 if (traceScope != null) traceScope.close();
308 }
309 }
310
311
312
313
314
315 public List<String> getChildren(String path, boolean watch)
316 throws KeeperException, InterruptedException {
317 TraceScope traceScope = null;
318 try {
319 traceScope = Trace.startSpan("RecoverableZookeeper.getChildren");
320 RetryCounter retryCounter = retryCounterFactory.create();
321 while (true) {
322 try {
323 return checkZk().getChildren(path, watch);
324 } catch (KeeperException e) {
325 switch (e.code()) {
326 case CONNECTIONLOSS:
327 case OPERATIONTIMEOUT:
328 retryOrThrow(retryCounter, e, "getChildren");
329 break;
330
331 default:
332 throw e;
333 }
334 }
335 retryCounter.sleepUntilNextRetry();
336 }
337 } finally {
338 if (traceScope != null) traceScope.close();
339 }
340 }
341
342
343
344
345
346 public byte[] getData(String path, Watcher watcher, Stat stat)
347 throws KeeperException, InterruptedException {
348 TraceScope traceScope = null;
349 try {
350 traceScope = Trace.startSpan("RecoverableZookeeper.getData");
351 RetryCounter retryCounter = retryCounterFactory.create();
352 while (true) {
353 try {
354 byte[] revData = checkZk().getData(path, watcher, stat);
355 return this.removeMetaData(revData);
356 } catch (KeeperException e) {
357 switch (e.code()) {
358 case CONNECTIONLOSS:
359 case OPERATIONTIMEOUT:
360 retryOrThrow(retryCounter, e, "getData");
361 break;
362
363 default:
364 throw e;
365 }
366 }
367 retryCounter.sleepUntilNextRetry();
368 }
369 } finally {
370 if (traceScope != null) traceScope.close();
371 }
372 }
373
374
375
376
377
378 public byte[] getData(String path, boolean watch, Stat stat)
379 throws KeeperException, InterruptedException {
380 TraceScope traceScope = null;
381 try {
382 traceScope = Trace.startSpan("RecoverableZookeeper.getData");
383 RetryCounter retryCounter = retryCounterFactory.create();
384 while (true) {
385 try {
386 byte[] revData = checkZk().getData(path, watch, stat);
387 return this.removeMetaData(revData);
388 } catch (KeeperException e) {
389 switch (e.code()) {
390 case CONNECTIONLOSS:
391 case OPERATIONTIMEOUT:
392 retryOrThrow(retryCounter, e, "getData");
393 break;
394
395 default:
396 throw e;
397 }
398 }
399 retryCounter.sleepUntilNextRetry();
400 }
401 } finally {
402 if (traceScope != null) traceScope.close();
403 }
404 }
405
406
407
408
409
410
411
412 public Stat setData(String path, byte[] data, int version)
413 throws KeeperException, InterruptedException {
414 TraceScope traceScope = null;
415 try {
416 traceScope = Trace.startSpan("RecoverableZookeeper.setData");
417 RetryCounter retryCounter = retryCounterFactory.create();
418 byte[] newData = appendMetaData(data);
419 boolean isRetry = false;
420 while (true) {
421 try {
422 return checkZk().setData(path, newData, version);
423 } catch (KeeperException e) {
424 switch (e.code()) {
425 case CONNECTIONLOSS:
426 case OPERATIONTIMEOUT:
427 retryOrThrow(retryCounter, e, "setData");
428 break;
429 case BADVERSION:
430 if (isRetry) {
431
432 try{
433 Stat stat = new Stat();
434 byte[] revData = checkZk().getData(path, false, stat);
435 if(Bytes.compareTo(revData, newData) == 0) {
436
437 return stat;
438 }
439 } catch(KeeperException keeperException){
440
441 throw keeperException;
442 }
443 }
444
445 default:
446 throw e;
447 }
448 }
449 retryCounter.sleepUntilNextRetry();
450 isRetry = true;
451 }
452 } finally {
453 if (traceScope != null) traceScope.close();
454 }
455 }
456
457
458
459
460
461 public List<ACL> getAcl(String path, Stat stat)
462 throws KeeperException, InterruptedException {
463 TraceScope traceScope = null;
464 try {
465 traceScope = Trace.startSpan("RecoverableZookeeper.getAcl");
466 RetryCounter retryCounter = retryCounterFactory.create();
467 while (true) {
468 try {
469 return checkZk().getACL(path, stat);
470 } catch (KeeperException e) {
471 switch (e.code()) {
472 case CONNECTIONLOSS:
473 case OPERATIONTIMEOUT:
474 retryOrThrow(retryCounter, e, "getAcl");
475 break;
476
477 default:
478 throw e;
479 }
480 }
481 retryCounter.sleepUntilNextRetry();
482 }
483 } finally {
484 if (traceScope != null) traceScope.close();
485 }
486 }
487
488
489
490
491
492 public Stat setAcl(String path, List<ACL> acls, int version)
493 throws KeeperException, InterruptedException {
494 TraceScope traceScope = null;
495 try {
496 traceScope = Trace.startSpan("RecoverableZookeeper.setAcl");
497 RetryCounter retryCounter = retryCounterFactory.create();
498 while (true) {
499 try {
500 return checkZk().setACL(path, acls, version);
501 } catch (KeeperException e) {
502 switch (e.code()) {
503 case CONNECTIONLOSS:
504 case OPERATIONTIMEOUT:
505 retryOrThrow(retryCounter, e, "setAcl");
506 break;
507
508 default:
509 throw e;
510 }
511 }
512 retryCounter.sleepUntilNextRetry();
513 }
514 } finally {
515 if (traceScope != null) traceScope.close();
516 }
517 }
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534 public String create(String path, byte[] data, List<ACL> acl,
535 CreateMode createMode)
536 throws KeeperException, InterruptedException {
537 TraceScope traceScope = null;
538 try {
539 traceScope = Trace.startSpan("RecoverableZookeeper.create");
540 byte[] newData = appendMetaData(data);
541 switch (createMode) {
542 case EPHEMERAL:
543 case PERSISTENT:
544 return createNonSequential(path, newData, acl, createMode);
545
546 case EPHEMERAL_SEQUENTIAL:
547 case PERSISTENT_SEQUENTIAL:
548 return createSequential(path, newData, acl, createMode);
549
550 default:
551 throw new IllegalArgumentException("Unrecognized CreateMode: " +
552 createMode);
553 }
554 } finally {
555 if (traceScope != null) traceScope.close();
556 }
557 }
558
559 private String createNonSequential(String path, byte[] data, List<ACL> acl,
560 CreateMode createMode) throws KeeperException, InterruptedException {
561 RetryCounter retryCounter = retryCounterFactory.create();
562 boolean isRetry = false;
563 while (true) {
564 try {
565 return checkZk().create(path, data, acl, createMode);
566 } catch (KeeperException e) {
567 switch (e.code()) {
568 case NODEEXISTS:
569 if (isRetry) {
570
571
572
573 byte[] currentData = checkZk().getData(path, false, null);
574 if (currentData != null &&
575 Bytes.compareTo(currentData, data) == 0) {
576
577 return path;
578 }
579 LOG.error("Node " + path + " already exists with " +
580 Bytes.toStringBinary(currentData) + ", could not write " +
581 Bytes.toStringBinary(data));
582 throw e;
583 }
584 LOG.debug("Node " + path + " already exists");
585 throw e;
586
587 case CONNECTIONLOSS:
588 case OPERATIONTIMEOUT:
589 retryOrThrow(retryCounter, e, "create");
590 break;
591
592 default:
593 throw e;
594 }
595 }
596 retryCounter.sleepUntilNextRetry();
597 isRetry = true;
598 }
599 }
600
601 private String createSequential(String path, byte[] data,
602 List<ACL> acl, CreateMode createMode)
603 throws KeeperException, InterruptedException {
604 RetryCounter retryCounter = retryCounterFactory.create();
605 boolean first = true;
606 String newPath = path+this.identifier;
607 while (true) {
608 try {
609 if (!first) {
610
611 String previousResult = findPreviousSequentialNode(newPath);
612 if (previousResult != null) {
613 return previousResult;
614 }
615 }
616 first = false;
617 return checkZk().create(newPath, data, acl, createMode);
618 } catch (KeeperException e) {
619 switch (e.code()) {
620 case CONNECTIONLOSS:
621 case OPERATIONTIMEOUT:
622 retryOrThrow(retryCounter, e, "create");
623 break;
624
625 default:
626 throw e;
627 }
628 }
629 retryCounter.sleepUntilNextRetry();
630 }
631 }
632
633
634
635
636 private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
637 throws UnsupportedOperationException {
638 if(ops == null) return null;
639
640 List<Op> preparedOps = new LinkedList<Op>();
641 for (Op op : ops) {
642 if (op.getType() == ZooDefs.OpCode.create) {
643 CreateRequest create = (CreateRequest)op.toRequestRecord();
644 preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
645 create.getAcl(), create.getFlags()));
646 } else if (op.getType() == ZooDefs.OpCode.delete) {
647
648 preparedOps.add(op);
649 } else if (op.getType() == ZooDefs.OpCode.setData) {
650 SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
651 preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
652 setData.getVersion()));
653 } else {
654 throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
655 }
656 }
657 return preparedOps;
658 }
659
660
661
662
663 public List<OpResult> multi(Iterable<Op> ops)
664 throws KeeperException, InterruptedException {
665 TraceScope traceScope = null;
666 try {
667 traceScope = Trace.startSpan("RecoverableZookeeper.multi");
668 RetryCounter retryCounter = retryCounterFactory.create();
669 Iterable<Op> multiOps = prepareZKMulti(ops);
670 while (true) {
671 try {
672 return checkZk().multi(multiOps);
673 } catch (KeeperException e) {
674 switch (e.code()) {
675 case CONNECTIONLOSS:
676 case OPERATIONTIMEOUT:
677 retryOrThrow(retryCounter, e, "multi");
678 break;
679
680 default:
681 throw e;
682 }
683 }
684 retryCounter.sleepUntilNextRetry();
685 }
686 } finally {
687 if (traceScope != null) traceScope.close();
688 }
689 }
690
691 private String findPreviousSequentialNode(String path)
692 throws KeeperException, InterruptedException {
693 int lastSlashIdx = path.lastIndexOf('/');
694 assert(lastSlashIdx != -1);
695 String parent = path.substring(0, lastSlashIdx);
696 String nodePrefix = path.substring(lastSlashIdx+1);
697
698 List<String> nodes = checkZk().getChildren(parent, false);
699 List<String> matching = filterByPrefix(nodes, nodePrefix);
700 for (String node : matching) {
701 String nodePath = parent + "/" + node;
702 Stat stat = checkZk().exists(nodePath, false);
703 if (stat != null) {
704 return nodePath;
705 }
706 }
707 return null;
708 }
709
710 public byte[] removeMetaData(byte[] data) {
711 if(data == null || data.length == 0) {
712 return data;
713 }
714
715 byte magic = data[0];
716 if(magic != MAGIC) {
717 return data;
718 }
719
720 int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
721 int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
722 int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
723
724 byte[] newData = new byte[dataLength];
725 System.arraycopy(data, dataOffset, newData, 0, dataLength);
726 return newData;
727 }
728
729 private byte[] appendMetaData(byte[] data) {
730 if(data == null || data.length == 0){
731 return data;
732 }
733 byte[] salt = Bytes.toBytes(salter.nextLong());
734 int idLength = id.length + salt.length;
735 byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+idLength+data.length];
736 int pos = 0;
737 pos = Bytes.putByte(newData, pos, MAGIC);
738 pos = Bytes.putInt(newData, pos, idLength);
739 pos = Bytes.putBytes(newData, pos, id, 0, id.length);
740 pos = Bytes.putBytes(newData, pos, salt, 0, salt.length);
741 pos = Bytes.putBytes(newData, pos, data, 0, data.length);
742 return newData;
743 }
744
745 public synchronized long getSessionId() {
746 return zk == null ? -1 : zk.getSessionId();
747 }
748
749 public synchronized void close() throws InterruptedException {
750 if (zk != null) zk.close();
751 }
752
753 public synchronized States getState() {
754 return zk == null ? null : zk.getState();
755 }
756
757 public synchronized ZooKeeper getZooKeeper() {
758 return zk;
759 }
760
761 public synchronized byte[] getSessionPasswd() {
762 return zk == null ? null : zk.getSessionPasswd();
763 }
764
765 public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
766 checkZk().sync(path, null, null);
767 }
768
769
770
771
772
773
774
775
776
777
778 private static List<String> filterByPrefix(List<String> nodes,
779 String... prefixes) {
780 List<String> lockChildren = new ArrayList<String>();
781 for (String child : nodes){
782 for (String prefix : prefixes){
783 if (child.startsWith(prefix)){
784 lockChildren.add(child);
785 break;
786 }
787 }
788 }
789 return lockChildren;
790 }
791
792 public String getIdentifier() {
793 return identifier;
794 }
795 }