1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import org.apache.commons.lang.StringUtils;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.classification.InterfaceStability;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.ServerName;
29 import org.apache.hadoop.hbase.exceptions.DeserializationException;
30 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
31 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.hbase.util.Threads;
34 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent;
35 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent;
36 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData;
37 import org.apache.hadoop.security.SecurityUtil;
38 import org.apache.hadoop.security.authentication.util.KerberosUtil;
39 import org.apache.zookeeper.AsyncCallback;
40 import org.apache.zookeeper.CreateMode;
41 import org.apache.zookeeper.KeeperException;
42 import org.apache.zookeeper.KeeperException.NoNodeException;
43 import org.apache.zookeeper.Op;
44 import org.apache.zookeeper.Watcher;
45 import org.apache.zookeeper.ZooDefs.Ids;
46 import org.apache.zookeeper.ZooKeeper;
47 import org.apache.zookeeper.client.ZooKeeperSaslClient;
48 import org.apache.zookeeper.data.ACL;
49 import org.apache.zookeeper.data.Stat;
50 import org.apache.zookeeper.proto.CreateRequest;
51 import org.apache.zookeeper.proto.DeleteRequest;
52 import org.apache.zookeeper.proto.SetDataRequest;
53 import org.apache.zookeeper.server.ZooKeeperSaslServer;
54
55 import com.google.protobuf.InvalidProtocolBufferException;
56
57 import javax.security.auth.login.AppConfigurationEntry;
58 import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
59 import java.io.BufferedReader;
60 import java.io.IOException;
61 import java.io.InputStreamReader;
62 import java.io.PrintWriter;
63 import java.net.InetSocketAddress;
64 import java.net.Socket;
65 import java.util.ArrayList;
66 import java.util.Arrays;
67 import java.util.HashMap;
68 import java.util.LinkedList;
69 import java.util.List;
70 import java.util.Map;
71 import java.util.Properties;
72
73
74
75
76
77
78
79
80
81
82 @InterfaceAudience.Public
83 @InterfaceStability.Evolving
84 public class ZKUtil {
85 private static final Log LOG = LogFactory.getLog(ZKUtil.class);
86
87
88 public static final char ZNODE_PATH_SEPARATOR = '/';
89 private static int zkDumpConnectionTimeOut;
90
91
92
93
94
95
96
97
98
99
100
101
102 public static RecoverableZooKeeper connect(Configuration conf, Watcher watcher)
103 throws IOException {
104 Properties properties = ZKConfig.makeZKProps(conf);
105 String ensemble = ZKConfig.getZKQuorumServersString(properties);
106 return connect(conf, ensemble, watcher);
107 }
108
109 public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
110 Watcher watcher)
111 throws IOException {
112 return connect(conf, ensemble, watcher, null);
113 }
114
115 public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
116 Watcher watcher, final String identifier)
117 throws IOException {
118 if(ensemble == null) {
119 throw new IOException("Unable to determine ZooKeeper ensemble");
120 }
121 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
122 HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
123 if (LOG.isTraceEnabled()) {
124 LOG.debug(identifier + " opening connection to ZooKeeper ensemble=" + ensemble);
125 }
126 int retry = conf.getInt("zookeeper.recovery.retry", 3);
127 int retryIntervalMillis =
128 conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
129 zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
130 1000);
131 return new RecoverableZooKeeper(ensemble, timeout, watcher,
132 retry, retryIntervalMillis, identifier);
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public static void loginServer(Configuration conf, String keytabFileKey,
150 String userNameKey, String hostname) throws IOException {
151 login(conf, keytabFileKey, userNameKey, hostname,
152 ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY,
153 JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME);
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 public static void loginClient(Configuration conf, String keytabFileKey,
171 String userNameKey, String hostname) throws IOException {
172 login(conf, keytabFileKey, userNameKey, hostname,
173 ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
174 JaasConfiguration.CLIENT_KEYTAB_KERBEROS_CONFIG_NAME);
175 }
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193 private static void login(Configuration conf, String keytabFileKey,
194 String userNameKey, String hostname,
195 String loginContextProperty, String loginContextName)
196 throws IOException {
197 if (!isSecureZooKeeper(conf))
198 return;
199
200
201
202 if (System.getProperty("java.security.auth.login.config") != null)
203 return;
204
205
206 String keytabFilename = conf.get(keytabFileKey);
207 if (keytabFilename == null) {
208 LOG.warn("no keytab specified for: " + keytabFileKey);
209 return;
210 }
211
212 String principalConfig = conf.get(userNameKey, System.getProperty("user.name"));
213 String principalName = SecurityUtil.getServerPrincipal(principalConfig, hostname);
214
215
216
217
218 JaasConfiguration jaasConf = new JaasConfiguration(loginContextName,
219 principalName, keytabFilename);
220 javax.security.auth.login.Configuration.setConfiguration(jaasConf);
221 System.setProperty(loginContextProperty, loginContextName);
222 }
223
224
225
226
227 private static class JaasConfiguration extends javax.security.auth.login.Configuration {
228 private static final String SERVER_KEYTAB_KERBEROS_CONFIG_NAME =
229 "zookeeper-server-keytab-kerberos";
230 private static final String CLIENT_KEYTAB_KERBEROS_CONFIG_NAME =
231 "zookeeper-client-keytab-kerberos";
232
233 private static final Map<String, String> BASIC_JAAS_OPTIONS =
234 new HashMap<String,String>();
235 static {
236 String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
237 if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
238 BASIC_JAAS_OPTIONS.put("debug", "true");
239 }
240 }
241
242 private static final Map<String,String> KEYTAB_KERBEROS_OPTIONS =
243 new HashMap<String,String>();
244 static {
245 KEYTAB_KERBEROS_OPTIONS.put("doNotPrompt", "true");
246 KEYTAB_KERBEROS_OPTIONS.put("storeKey", "true");
247 KEYTAB_KERBEROS_OPTIONS.put("refreshKrb5Config", "true");
248 KEYTAB_KERBEROS_OPTIONS.putAll(BASIC_JAAS_OPTIONS);
249 }
250
251 private static final AppConfigurationEntry KEYTAB_KERBEROS_LOGIN =
252 new AppConfigurationEntry(KerberosUtil.getKrb5LoginModuleName(),
253 LoginModuleControlFlag.REQUIRED,
254 KEYTAB_KERBEROS_OPTIONS);
255
256 private static final AppConfigurationEntry[] KEYTAB_KERBEROS_CONF =
257 new AppConfigurationEntry[]{KEYTAB_KERBEROS_LOGIN};
258
259 private javax.security.auth.login.Configuration baseConfig;
260 private final String loginContextName;
261 private final boolean useTicketCache;
262 private final String keytabFile;
263 private final String principal;
264
265 public JaasConfiguration(String loginContextName, String principal) {
266 this(loginContextName, principal, null, true);
267 }
268
269 public JaasConfiguration(String loginContextName, String principal, String keytabFile) {
270 this(loginContextName, principal, keytabFile, keytabFile == null || keytabFile.length() == 0);
271 }
272
273 private JaasConfiguration(String loginContextName, String principal,
274 String keytabFile, boolean useTicketCache) {
275 try {
276 this.baseConfig = javax.security.auth.login.Configuration.getConfiguration();
277 } catch (SecurityException e) {
278 this.baseConfig = null;
279 }
280 this.loginContextName = loginContextName;
281 this.useTicketCache = useTicketCache;
282 this.keytabFile = keytabFile;
283 this.principal = principal;
284 LOG.info("JaasConfiguration loginContextName=" + loginContextName +
285 " principal=" + principal + " useTicketCache=" + useTicketCache +
286 " keytabFile=" + keytabFile);
287 }
288
289 @Override
290 public AppConfigurationEntry[] getAppConfigurationEntry(String appName) {
291 if (loginContextName.equals(appName)) {
292 if (!useTicketCache) {
293 KEYTAB_KERBEROS_OPTIONS.put("keyTab", keytabFile);
294 KEYTAB_KERBEROS_OPTIONS.put("useKeyTab", "true");
295 }
296 KEYTAB_KERBEROS_OPTIONS.put("principal", principal);
297 KEYTAB_KERBEROS_OPTIONS.put("useTicketCache", useTicketCache ? "true" : "false");
298 return KEYTAB_KERBEROS_CONF;
299 }
300 if (baseConfig != null) return baseConfig.getAppConfigurationEntry(appName);
301 return(null);
302 }
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319 public static String joinZNode(String prefix, String suffix) {
320 return prefix + ZNODE_PATH_SEPARATOR + suffix;
321 }
322
323
324
325
326
327
328 public static String getParent(String node) {
329 int idx = node.lastIndexOf(ZNODE_PATH_SEPARATOR);
330 return idx <= 0 ? null : node.substring(0, idx);
331 }
332
333
334
335
336
337
338 public static String getNodeName(String path) {
339 return path.substring(path.lastIndexOf("/")+1);
340 }
341
342
343
344
345
346
347
348 public static String getZooKeeperClusterKey(Configuration conf) {
349 return getZooKeeperClusterKey(conf, null);
350 }
351
352
353
354
355
356
357
358
359 public static String getZooKeeperClusterKey(Configuration conf, String name) {
360 String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM.replaceAll(
361 "[\\t\\n\\x0B\\f\\r]", ""));
362 StringBuilder builder = new StringBuilder(ensemble);
363 builder.append(":");
364 builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
365 builder.append(":");
366 builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
367 if (name != null && !name.isEmpty()) {
368 builder.append(",");
369 builder.append(name);
370 }
371 return builder.toString();
372 }
373
374
375
376
377
378
379
380
381 public static void applyClusterKeyToConf(Configuration conf, String key)
382 throws IOException{
383 String[] parts = transformClusterKey(key);
384 conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
385 conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]);
386 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
387 }
388
389
390
391
392
393
394
395
396
397 public static String[] transformClusterKey(String key) throws IOException {
398 String[] parts = key.split(":");
399 if (parts.length != 3) {
400 throw new IOException("Cluster key invalid, the format should be:" +
401 HConstants.ZOOKEEPER_QUORUM + ":hbase.zookeeper.client.port:"
402 + HConstants.ZOOKEEPER_ZNODE_PARENT);
403 }
404 return parts;
405 }
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421 public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode)
422 throws KeeperException {
423 try {
424 Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
425 boolean exists = s != null ? true : false;
426 if (exists) {
427 LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode));
428 } else {
429 LOG.debug(zkw.prefix("Set watcher on znode that does not yet exist, " + znode));
430 }
431 return exists;
432 } catch (KeeperException e) {
433 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
434 zkw.keeperException(e);
435 return false;
436 } catch (InterruptedException e) {
437 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
438 zkw.interruptedException(e);
439 return false;
440 }
441 }
442
443
444
445
446
447
448
449
450
451
452
453 public static boolean setWatchIfNodeExists(ZooKeeperWatcher zkw, String znode)
454 throws KeeperException {
455 try {
456 zkw.getRecoverableZooKeeper().getData(znode, true, null);
457 return true;
458 } catch (NoNodeException e) {
459 return false;
460 } catch (InterruptedException e) {
461 LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
462 zkw.interruptedException(e);
463 return false;
464 }
465 }
466
467
468
469
470
471
472
473
474
475 public static int checkExists(ZooKeeperWatcher zkw, String znode)
476 throws KeeperException {
477 try {
478 Stat s = zkw.getRecoverableZooKeeper().exists(znode, null);
479 return s != null ? s.getVersion() : -1;
480 } catch (KeeperException e) {
481 LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
482 zkw.keeperException(e);
483 return -1;
484 } catch (InterruptedException e) {
485 LOG.warn(zkw.prefix("Unable to set watcher on znode (" + znode + ")"), e);
486 zkw.interruptedException(e);
487 return -1;
488 }
489 }
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511 public static List<String> listChildrenAndWatchForNewChildren(
512 ZooKeeperWatcher zkw, String znode)
513 throws KeeperException {
514 try {
515 List<String> children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
516 return children;
517 } catch(KeeperException.NoNodeException ke) {
518 LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
519 "because node does not exist (not an error)"));
520 return null;
521 } catch (KeeperException e) {
522 LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e);
523 zkw.keeperException(e);
524 return null;
525 } catch (InterruptedException e) {
526 LOG.warn(zkw.prefix("Unable to list children of znode " + znode + " "), e);
527 zkw.interruptedException(e);
528 return null;
529 }
530 }
531
532
533
534
535
536
537
538
539
540
541 public static List<String> listChildrenAndWatchThem(ZooKeeperWatcher zkw,
542 String znode) throws KeeperException {
543 List<String> children = listChildrenAndWatchForNewChildren(zkw, znode);
544 if (children == null) {
545 return null;
546 }
547 for (String child : children) {
548 watchAndCheckExists(zkw, joinZNode(znode, child));
549 }
550 return children;
551 }
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567 public static List<String> listChildrenNoWatch(ZooKeeperWatcher zkw, String znode)
568 throws KeeperException {
569 List<String> children = null;
570 try {
571
572 children = zkw.getRecoverableZooKeeper().getChildren(znode, null);
573 } catch(KeeperException.NoNodeException nne) {
574 return null;
575 } catch(InterruptedException ie) {
576 zkw.interruptedException(ie);
577 }
578 return children;
579 }
580
581
582
583
584
585 @Deprecated
586 public static class NodeAndData {
587 private String node;
588 private byte [] data;
589 public NodeAndData(String node, byte [] data) {
590 this.node = node;
591 this.data = data;
592 }
593 public String getNode() {
594 return node;
595 }
596 public byte [] getData() {
597 return data;
598 }
599 @Override
600 public String toString() {
601 return node;
602 }
603 public boolean isEmpty() {
604 return (data.length == 0);
605 }
606 }
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624 public static boolean nodeHasChildren(ZooKeeperWatcher zkw, String znode)
625 throws KeeperException {
626 try {
627 return !zkw.getRecoverableZooKeeper().getChildren(znode, null).isEmpty();
628 } catch(KeeperException.NoNodeException ke) {
629 LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
630 "because node does not exist (not an error)"));
631 return false;
632 } catch (KeeperException e) {
633 LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e);
634 zkw.keeperException(e);
635 return false;
636 } catch (InterruptedException e) {
637 LOG.warn(zkw.prefix("Unable to list children of znode " + znode), e);
638 zkw.interruptedException(e);
639 return false;
640 }
641 }
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656 public static int getNumberOfChildren(ZooKeeperWatcher zkw, String znode)
657 throws KeeperException {
658 try {
659 Stat stat = zkw.getRecoverableZooKeeper().exists(znode, null);
660 return stat == null ? 0 : stat.getNumChildren();
661 } catch(KeeperException e) {
662 LOG.warn(zkw.prefix("Unable to get children of node " + znode));
663 zkw.keeperException(e);
664 } catch(InterruptedException e) {
665 zkw.interruptedException(e);
666 }
667 return 0;
668 }
669
670
671
672
673
674
675
676
677
678
679 public static byte [] getData(ZooKeeperWatcher zkw, String znode)
680 throws KeeperException {
681 try {
682 byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, null);
683 logRetrievedMsg(zkw, znode, data, false);
684 return data;
685 } catch (KeeperException.NoNodeException e) {
686 LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " +
687 "because node does not exist (not an error)"));
688 return null;
689 } catch (KeeperException e) {
690 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
691 zkw.keeperException(e);
692 return null;
693 } catch (InterruptedException e) {
694 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
695 zkw.interruptedException(e);
696 return null;
697 }
698 }
699
700
701
702
703
704
705
706
707
708
709
710
711 public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode)
712 throws KeeperException {
713 return getDataInternal(zkw, znode, null, true);
714 }
715
716
717
718
719
720
721
722
723
724
725
726
727
728 public static byte[] getDataAndWatch(ZooKeeperWatcher zkw, String znode,
729 Stat stat) throws KeeperException {
730 return getDataInternal(zkw, znode, stat, true);
731 }
732
733 private static byte[] getDataInternal(ZooKeeperWatcher zkw, String znode, Stat stat,
734 boolean watcherSet)
735 throws KeeperException {
736 try {
737 byte [] data = zkw.getRecoverableZooKeeper().getData(znode, zkw, stat);
738 logRetrievedMsg(zkw, znode, data, watcherSet);
739 return data;
740 } catch (KeeperException.NoNodeException e) {
741
742
743 LOG.trace(zkw.prefix("Unable to get data of znode " + znode + " " +
744 "because node does not exist (not an error)"));
745 return null;
746 } catch (KeeperException e) {
747 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
748 zkw.keeperException(e);
749 return null;
750 } catch (InterruptedException e) {
751 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
752 zkw.interruptedException(e);
753 return null;
754 }
755 }
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772 public static byte [] getDataNoWatch(ZooKeeperWatcher zkw, String znode,
773 Stat stat)
774 throws KeeperException {
775 try {
776 byte [] data = zkw.getRecoverableZooKeeper().getData(znode, null, stat);
777 logRetrievedMsg(zkw, znode, data, false);
778 return data;
779 } catch (KeeperException.NoNodeException e) {
780 LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " +
781 "because node does not exist (not necessarily an error)"));
782 return null;
783 } catch (KeeperException e) {
784 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
785 zkw.keeperException(e);
786 return null;
787 } catch (InterruptedException e) {
788 LOG.warn(zkw.prefix("Unable to get data of znode " + znode), e);
789 zkw.interruptedException(e);
790 return null;
791 }
792 }
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811 public static List<NodeAndData> getChildDataAndWatchForNewChildren(
812 ZooKeeperWatcher zkw, String baseNode) throws KeeperException {
813 List<String> nodes =
814 ZKUtil.listChildrenAndWatchForNewChildren(zkw, baseNode);
815 List<NodeAndData> newNodes = new ArrayList<NodeAndData>();
816 if (nodes != null) {
817 for (String node : nodes) {
818 String nodePath = ZKUtil.joinZNode(baseNode, node);
819 byte[] data = ZKUtil.getDataAndWatch(zkw, nodePath);
820 newNodes.add(new NodeAndData(nodePath, data));
821 }
822 }
823 return newNodes;
824 }
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842 public static void updateExistingNodeData(ZooKeeperWatcher zkw, String znode,
843 byte [] data, int expectedVersion)
844 throws KeeperException {
845 try {
846 zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion);
847 } catch(InterruptedException ie) {
848 zkw.interruptedException(ie);
849 }
850 }
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876 public static boolean setData(ZooKeeperWatcher zkw, String znode,
877 byte [] data, int expectedVersion)
878 throws KeeperException, KeeperException.NoNodeException {
879 try {
880 return zkw.getRecoverableZooKeeper().setData(znode, data, expectedVersion) != null;
881 } catch (InterruptedException e) {
882 zkw.interruptedException(e);
883 return false;
884 }
885 }
886
887
888
889
890
891
892
893
894
895
896 public static void createSetData(final ZooKeeperWatcher zkw, final String znode,
897 final byte [] data)
898 throws KeeperException {
899 if (checkExists(zkw, znode) == -1) {
900 ZKUtil.createWithParents(zkw, znode, data);
901 } else {
902 ZKUtil.setData(zkw, znode, data);
903 }
904 }
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922 public static void setData(ZooKeeperWatcher zkw, String znode, byte [] data)
923 throws KeeperException, KeeperException.NoNodeException {
924 setData(zkw, (SetData)ZKUtilOp.setData(znode, data));
925 }
926
927 private static void setData(ZooKeeperWatcher zkw, SetData setData)
928 throws KeeperException, KeeperException.NoNodeException {
929 SetDataRequest sd = (SetDataRequest)toZooKeeperOp(zkw, setData).toRequestRecord();
930 setData(zkw, sd.getPath(), sd.getData(), sd.getVersion());
931 }
932
933
934
935
936
937
938 public static boolean isSecureZooKeeper(Configuration conf) {
939
940
941
942 if (System.getProperty("java.security.auth.login.config") != null)
943 return true;
944
945
946 return("kerberos".equalsIgnoreCase(conf.get("hbase.security.authentication")) &&
947 conf.get("hbase.zookeeper.client.keytab.file") != null);
948 }
949
950 private static ArrayList<ACL> createACL(ZooKeeperWatcher zkw, String node) {
951 if (isSecureZooKeeper(zkw.getConfiguration())) {
952
953
954 if ((node.equals(zkw.baseZNode) == true) ||
955 (node.equals(zkw.metaServerZNode) == true) ||
956 (node.equals(zkw.getMasterAddressZNode()) == true) ||
957 (node.equals(zkw.clusterIdZNode) == true) ||
958 (node.equals(zkw.rsZNode) == true) ||
959 (node.equals(zkw.backupMasterAddressesZNode) == true) ||
960 (node.startsWith(zkw.assignmentZNode) == true) ||
961 (node.startsWith(zkw.tableZNode) == true)) {
962 return ZooKeeperWatcher.CREATOR_ALL_AND_WORLD_READABLE;
963 }
964 return Ids.CREATOR_ALL_ACL;
965 } else {
966 return Ids.OPEN_ACL_UNSAFE;
967 }
968 }
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992 public static boolean createEphemeralNodeAndWatch(ZooKeeperWatcher zkw,
993 String znode, byte [] data)
994 throws KeeperException {
995 try {
996 zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
997 CreateMode.EPHEMERAL);
998 } catch (KeeperException.NodeExistsException nee) {
999 if(!watchAndCheckExists(zkw, znode)) {
1000
1001 return createEphemeralNodeAndWatch(zkw, znode, data);
1002 }
1003 return false;
1004 } catch (InterruptedException e) {
1005 LOG.info("Interrupted", e);
1006 Thread.currentThread().interrupt();
1007 }
1008 return true;
1009 }
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031 public static boolean createNodeIfNotExistsAndWatch(
1032 ZooKeeperWatcher zkw, String znode, byte [] data)
1033 throws KeeperException {
1034 try {
1035 zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
1036 CreateMode.PERSISTENT);
1037 } catch (KeeperException.NodeExistsException nee) {
1038 try {
1039 zkw.getRecoverableZooKeeper().exists(znode, zkw);
1040 } catch (InterruptedException e) {
1041 zkw.interruptedException(e);
1042 return false;
1043 }
1044 return false;
1045 } catch (InterruptedException e) {
1046 zkw.interruptedException(e);
1047 return false;
1048 }
1049 return true;
1050 }
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066 public static String createNodeIfNotExistsNoWatch(ZooKeeperWatcher zkw, String znode,
1067 byte[] data, CreateMode createMode) throws KeeperException {
1068
1069 String createdZNode = null;
1070 try {
1071 createdZNode = zkw.getRecoverableZooKeeper().create(znode, data,
1072 createACL(zkw, znode), createMode);
1073 } catch (KeeperException.NodeExistsException nee) {
1074 return znode;
1075 } catch (InterruptedException e) {
1076 zkw.interruptedException(e);
1077 return null;
1078 }
1079 return createdZNode;
1080 }
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098 public static int createAndWatch(ZooKeeperWatcher zkw,
1099 String znode, byte [] data)
1100 throws KeeperException, KeeperException.NodeExistsException {
1101 try {
1102 zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
1103 CreateMode.PERSISTENT);
1104 Stat stat = zkw.getRecoverableZooKeeper().exists(znode, zkw);
1105 if (stat == null){
1106
1107 throw KeeperException.create(KeeperException.Code.SYSTEMERROR,
1108 "ZK.exists returned null (i.e.: znode does not exist) for znode=" + znode);
1109 }
1110 return stat.getVersion();
1111 } catch (InterruptedException e) {
1112 zkw.interruptedException(e);
1113 return -1;
1114 }
1115 }
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132 public static void asyncCreate(ZooKeeperWatcher zkw,
1133 String znode, byte [] data, final AsyncCallback.StringCallback cb,
1134 final Object ctx) {
1135 zkw.getRecoverableZooKeeper().getZooKeeper().create(znode, data,
1136 createACL(zkw, znode), CreateMode.PERSISTENT, cb, ctx);
1137 }
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149 public static void createAndFailSilent(ZooKeeperWatcher zkw,
1150 String znode) throws KeeperException {
1151 createAndFailSilent(zkw, znode, new byte[0]);
1152 }
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165 public static void createAndFailSilent(ZooKeeperWatcher zkw,
1166 String znode, byte[] data)
1167 throws KeeperException {
1168 createAndFailSilent(zkw,
1169 (CreateAndFailSilent)ZKUtilOp.createAndFailSilent(znode, data));
1170 }
1171
1172 private static void createAndFailSilent(ZooKeeperWatcher zkw, CreateAndFailSilent cafs)
1173 throws KeeperException {
1174 CreateRequest create = (CreateRequest)toZooKeeperOp(zkw, cafs).toRequestRecord();
1175 String znode = create.getPath();
1176 try {
1177 RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
1178 if (zk.exists(znode, false) == null) {
1179 zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags()));
1180 }
1181 } catch(KeeperException.NodeExistsException nee) {
1182 } catch(KeeperException.NoAuthException nee){
1183 try {
1184 if (null == zkw.getRecoverableZooKeeper().exists(znode, false)) {
1185
1186 throw(nee);
1187 }
1188 } catch (InterruptedException ie) {
1189 zkw.interruptedException(ie);
1190 }
1191
1192 } catch(InterruptedException ie) {
1193 zkw.interruptedException(ie);
1194 }
1195 }
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208 public static void createWithParents(ZooKeeperWatcher zkw, String znode)
1209 throws KeeperException {
1210 createWithParents(zkw, znode, new byte[0]);
1211 }
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226 public static void createWithParents(ZooKeeperWatcher zkw, String znode, byte[] data)
1227 throws KeeperException {
1228 try {
1229 if(znode == null) {
1230 return;
1231 }
1232 zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
1233 CreateMode.PERSISTENT);
1234 } catch(KeeperException.NodeExistsException nee) {
1235 return;
1236 } catch(KeeperException.NoNodeException nne) {
1237 createWithParents(zkw, getParent(znode));
1238 createWithParents(zkw, znode, data);
1239 } catch(InterruptedException ie) {
1240 zkw.interruptedException(ie);
1241 }
1242 }
1243
1244
1245
1246
1247
1248
1249
1250
1251 public static void deleteNode(ZooKeeperWatcher zkw, String node)
1252 throws KeeperException {
1253 deleteNode(zkw, node, -1);
1254 }
1255
1256
1257
1258
1259
1260 public static boolean deleteNode(ZooKeeperWatcher zkw, String node,
1261 int version)
1262 throws KeeperException {
1263 try {
1264 zkw.getRecoverableZooKeeper().delete(node, version);
1265 return true;
1266 } catch(KeeperException.BadVersionException bve) {
1267 return false;
1268 } catch(InterruptedException ie) {
1269 zkw.interruptedException(ie);
1270 return false;
1271 }
1272 }
1273
1274
1275
1276
1277
1278
1279
1280 public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
1281 throws KeeperException {
1282 deleteNodeFailSilent(zkw,
1283 (DeleteNodeFailSilent)ZKUtilOp.deleteNodeFailSilent(node));
1284 }
1285
1286 private static void deleteNodeFailSilent(ZooKeeperWatcher zkw,
1287 DeleteNodeFailSilent dnfs) throws KeeperException {
1288 DeleteRequest delete = (DeleteRequest)toZooKeeperOp(zkw, dnfs).toRequestRecord();
1289 try {
1290 zkw.getRecoverableZooKeeper().delete(delete.getPath(), delete.getVersion());
1291 } catch(KeeperException.NoNodeException nne) {
1292 } catch(InterruptedException ie) {
1293 zkw.interruptedException(ie);
1294 }
1295 }
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306 public static void deleteNodeRecursively(ZooKeeperWatcher zkw, String node)
1307 throws KeeperException {
1308 try {
1309 List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
1310
1311 if (children == null) return;
1312
1313 if(!children.isEmpty()) {
1314 for(String child : children) {
1315 deleteNodeRecursively(zkw, joinZNode(node, child));
1316 }
1317 }
1318 zkw.getRecoverableZooKeeper().delete(node, -1);
1319 } catch(InterruptedException ie) {
1320 zkw.interruptedException(ie);
1321 }
1322 }
1323
1324
1325
1326
1327
1328
1329
1330 public static void deleteChildrenRecursively(ZooKeeperWatcher zkw, String node)
1331 throws KeeperException {
1332 List<String> children = ZKUtil.listChildrenNoWatch(zkw, node);
1333 if (children == null || children.isEmpty()) return;
1334 for(String child : children) {
1335 deleteNodeRecursively(zkw, joinZNode(node, child));
1336 }
1337 }
1338
1339
1340
1341
1342
1343
1344 public abstract static class ZKUtilOp {
1345 private String path;
1346
1347 private ZKUtilOp(String path) {
1348 this.path = path;
1349 }
1350
1351
1352
1353
1354 public static ZKUtilOp createAndFailSilent(String path, byte[] data) {
1355 return new CreateAndFailSilent(path, data);
1356 }
1357
1358
1359
1360
1361 public static ZKUtilOp deleteNodeFailSilent(String path) {
1362 return new DeleteNodeFailSilent(path);
1363 }
1364
1365
1366
1367
1368 public static ZKUtilOp setData(String path, byte [] data) {
1369 return new SetData(path, data);
1370 }
1371
1372
1373
1374
1375 public String getPath() {
1376 return path;
1377 }
1378
1379
1380
1381
1382
1383 public static class CreateAndFailSilent extends ZKUtilOp {
1384 private byte [] data;
1385
1386 private CreateAndFailSilent(String path, byte [] data) {
1387 super(path);
1388 this.data = data;
1389 }
1390
1391 public byte[] getData() {
1392 return data;
1393 }
1394
1395 @Override
1396 public boolean equals(Object o) {
1397 if (this == o) return true;
1398 if (!(o instanceof CreateAndFailSilent)) return false;
1399
1400 CreateAndFailSilent op = (CreateAndFailSilent) o;
1401 return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
1402 }
1403
1404 @Override
1405 public int hashCode() {
1406 int ret = 17 + getPath().hashCode() * 31;
1407 return ret * 31 + Bytes.hashCode(data);
1408 }
1409 }
1410
1411
1412
1413
1414
1415 public static class DeleteNodeFailSilent extends ZKUtilOp {
1416 private DeleteNodeFailSilent(String path) {
1417 super(path);
1418 }
1419
1420 @Override
1421 public boolean equals(Object o) {
1422 if (this == o) return true;
1423 if (!(o instanceof DeleteNodeFailSilent)) return false;
1424
1425 return super.equals(o);
1426 }
1427
1428 @Override
1429 public int hashCode() {
1430 return getPath().hashCode();
1431 }
1432 }
1433
1434
1435
1436
1437 public static class SetData extends ZKUtilOp {
1438 private byte [] data;
1439
1440 private SetData(String path, byte [] data) {
1441 super(path);
1442 this.data = data;
1443 }
1444
1445 public byte[] getData() {
1446 return data;
1447 }
1448
1449 @Override
1450 public boolean equals(Object o) {
1451 if (this == o) return true;
1452 if (!(o instanceof SetData)) return false;
1453
1454 SetData op = (SetData) o;
1455 return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
1456 }
1457
1458 @Override
1459 public int hashCode() {
1460 int ret = getPath().hashCode();
1461 return ret * 31 + Bytes.hashCode(data);
1462 }
1463 }
1464 }
1465
1466
1467
1468
1469 private static Op toZooKeeperOp(ZooKeeperWatcher zkw, ZKUtilOp op)
1470 throws UnsupportedOperationException {
1471 if(op == null) return null;
1472
1473 if (op instanceof CreateAndFailSilent) {
1474 CreateAndFailSilent cafs = (CreateAndFailSilent)op;
1475 return Op.create(cafs.getPath(), cafs.getData(), createACL(zkw, cafs.getPath()),
1476 CreateMode.PERSISTENT);
1477 } else if (op instanceof DeleteNodeFailSilent) {
1478 DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
1479 return Op.delete(dnfs.getPath(), -1);
1480 } else if (op instanceof SetData) {
1481 SetData sd = (SetData)op;
1482 return Op.setData(sd.getPath(), sd.getData(), -1);
1483 } else {
1484 throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
1485 + op.getClass().getName());
1486 }
1487 }
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510 public static void multiOrSequential(ZooKeeperWatcher zkw, List<ZKUtilOp> ops,
1511 boolean runSequentialOnMultiFailure) throws KeeperException {
1512 if (ops == null) return;
1513 boolean useMulti = zkw.getConfiguration().getBoolean(HConstants.ZOOKEEPER_USEMULTI, false);
1514
1515 if (useMulti) {
1516 List<Op> zkOps = new LinkedList<Op>();
1517 for (ZKUtilOp op : ops) {
1518 zkOps.add(toZooKeeperOp(zkw, op));
1519 }
1520 try {
1521 zkw.getRecoverableZooKeeper().multi(zkOps);
1522 } catch (KeeperException ke) {
1523 switch (ke.code()) {
1524 case NODEEXISTS:
1525 case NONODE:
1526 case BADVERSION:
1527 case NOAUTH:
1528
1529
1530 if (runSequentialOnMultiFailure) {
1531 LOG.info("On call to ZK.multi, received exception: " + ke.toString() + "."
1532 + " Attempting to run operations sequentially because"
1533 + " runSequentialOnMultiFailure is: " + runSequentialOnMultiFailure + ".");
1534 processSequentially(zkw, ops);
1535 break;
1536 }
1537 default:
1538 throw ke;
1539 }
1540 } catch (InterruptedException ie) {
1541 zkw.interruptedException(ie);
1542 }
1543 } else {
1544
1545 processSequentially(zkw, ops);
1546 }
1547 }
1548
1549 private static void processSequentially(ZooKeeperWatcher zkw, List<ZKUtilOp> ops)
1550 throws KeeperException, NoNodeException {
1551 for (ZKUtilOp op : ops) {
1552 if (op instanceof CreateAndFailSilent) {
1553 createAndFailSilent(zkw, (CreateAndFailSilent) op);
1554 } else if (op instanceof DeleteNodeFailSilent) {
1555 deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) op);
1556 } else if (op instanceof SetData) {
1557 setData(zkw, (SetData) op);
1558 } else {
1559 throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
1560 + op.getClass().getName());
1561 }
1562 }
1563 }
1564
1565
1566
1567
1568
1569
1570 public static String dump(ZooKeeperWatcher zkw) {
1571 StringBuilder sb = new StringBuilder();
1572 try {
1573 sb.append("HBase is rooted at ").append(zkw.baseZNode);
1574 sb.append("\nActive master address: ");
1575 try {
1576 sb.append(MasterAddressTracker.getMasterAddress(zkw));
1577 } catch (IOException e) {
1578 sb.append("<<FAILED LOOKUP: " + e.getMessage() + ">>");
1579 }
1580 sb.append("\nBackup master addresses:");
1581 for (String child : listChildrenNoWatch(zkw,
1582 zkw.backupMasterAddressesZNode)) {
1583 sb.append("\n ").append(child);
1584 }
1585 sb.append("\nRegion server holding .META.: " + MetaRegionTracker.getMetaRegionLocation(zkw));
1586 sb.append("\nRegion servers:");
1587 for (String child : listChildrenNoWatch(zkw, zkw.rsZNode)) {
1588 sb.append("\n ").append(child);
1589 }
1590 try {
1591 getReplicationZnodesDump(zkw, sb);
1592 } catch (KeeperException ke) {
1593 LOG.warn("Couldn't get the replication znode dump", ke);
1594 }
1595 sb.append("\nQuorum Server Statistics:");
1596 String[] servers = zkw.getQuorum().split(",");
1597 for (String server : servers) {
1598 sb.append("\n ").append(server);
1599 try {
1600 String[] stat = getServerStats(server, ZKUtil.zkDumpConnectionTimeOut);
1601
1602 if (stat == null) {
1603 sb.append("[Error] invalid quorum server: " + server);
1604 break;
1605 }
1606
1607 for (String s : stat) {
1608 sb.append("\n ").append(s);
1609 }
1610 } catch (Exception e) {
1611 sb.append("\n ERROR: ").append(e.getMessage());
1612 }
1613 }
1614 } catch (KeeperException ke) {
1615 sb.append("\nFATAL ZooKeeper Exception!\n");
1616 sb.append("\n" + ke.getMessage());
1617 }
1618 return sb.toString();
1619 }
1620
1621 private static void getReplicationZnodesDump(ZooKeeperWatcher zkw, StringBuilder sb)
1622 throws KeeperException {
1623 String replicationZNodeName = zkw.getConfiguration().get("zookeeper.znode.replication",
1624 "replication");
1625 String replicationZnode = joinZNode(zkw.baseZNode, replicationZNodeName);
1626 if (ZKUtil.checkExists(zkw, replicationZnode) == -1) return;
1627
1628 List<String> stack = new LinkedList<String>();
1629 stack.add(replicationZnode);
1630 do {
1631 String znodeToProcess = stack.remove(stack.size() - 1);
1632 sb.append("\n").append(znodeToProcess).append(": ")
1633 .append(Bytes.toString(ZKUtil.getData(zkw, znodeToProcess)));
1634 for (String zNodeChild : ZKUtil.listChildrenNoWatch(zkw, znodeToProcess)) {
1635 stack.add(ZKUtil.joinZNode(znodeToProcess, zNodeChild));
1636 }
1637 } while (stack.size() > 0);
1638 }
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648 public static String[] getServerStats(String server, int timeout)
1649 throws IOException {
1650 String[] sp = server.split(":");
1651 if (sp == null || sp.length == 0) {
1652 return null;
1653 }
1654
1655 String host = sp[0];
1656 int port = sp.length > 1 ? Integer.parseInt(sp[1])
1657 : HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
1658
1659 Socket socket = new Socket();
1660 InetSocketAddress sockAddr = new InetSocketAddress(host, port);
1661 socket.connect(sockAddr, timeout);
1662
1663 socket.setSoTimeout(timeout);
1664 PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
1665 BufferedReader in = new BufferedReader(new InputStreamReader(
1666 socket.getInputStream()));
1667 out.println("stat");
1668 out.flush();
1669 ArrayList<String> res = new ArrayList<String>();
1670 while (true) {
1671 String line = in.readLine();
1672 if (line != null) {
1673 res.add(line);
1674 } else {
1675 break;
1676 }
1677 }
1678 socket.close();
1679 return res.toArray(new String[res.size()]);
1680 }
1681
1682 private static void logRetrievedMsg(final ZooKeeperWatcher zkw,
1683 final String znode, final byte [] data, final boolean watcherSet) {
1684 if (!LOG.isTraceEnabled()) return;
1685 LOG.trace(zkw.prefix("Retrieved " + ((data == null)? 0: data.length) +
1686 " byte(s) of data from znode " + znode +
1687 (watcherSet? " and set watcher; ": "; data=") +
1688 (data == null? "null": data.length == 0? "empty": (
1689 znode.startsWith(zkw.assignmentZNode)?
1690 ZKAssign.toString(data):
1691 znode.startsWith(zkw.metaServerZNode)?
1692 getServerNameOrEmptyString(data):
1693 znode.startsWith(zkw.backupMasterAddressesZNode)?
1694 getServerNameOrEmptyString(data):
1695 StringUtils.abbreviate(Bytes.toStringBinary(data), 32)))));
1696 }
1697
1698 private static String getServerNameOrEmptyString(final byte [] data) {
1699 try {
1700 return ServerName.parseFrom(data).toString();
1701 } catch (DeserializationException e) {
1702 return "";
1703 }
1704 }
1705
1706
1707
1708
1709
1710 public static void waitForBaseZNode(Configuration conf) throws IOException {
1711 LOG.info("Waiting until the base znode is available");
1712 String parentZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
1713 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
1714 ZooKeeper zk = new ZooKeeper(ZKConfig.getZKQuorumServersString(conf),
1715 conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1716 HConstants.DEFAULT_ZK_SESSION_TIMEOUT), EmptyWatcher.instance);
1717
1718 final int maxTimeMs = 10000;
1719 final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
1720
1721 KeeperException keeperEx = null;
1722 try {
1723 try {
1724 for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
1725 try {
1726 if (zk.exists(parentZNode, false) != null) {
1727 LOG.info("Parent znode exists: " + parentZNode);
1728 keeperEx = null;
1729 break;
1730 }
1731 } catch (KeeperException e) {
1732 keeperEx = e;
1733 }
1734 Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
1735 }
1736 } finally {
1737 zk.close();
1738 }
1739 } catch (InterruptedException ex) {
1740 Thread.currentThread().interrupt();
1741 }
1742
1743 if (keeperEx != null) {
1744 throw new IOException(keeperEx);
1745 }
1746 }
1747
1748
1749 public static byte[] blockUntilAvailable(
1750 final ZooKeeperWatcher zkw, final String znode, final long timeout)
1751 throws InterruptedException {
1752 if (timeout < 0) throw new IllegalArgumentException();
1753 if (zkw == null) throw new IllegalArgumentException();
1754 if (znode == null) throw new IllegalArgumentException();
1755
1756 byte[] data = null;
1757 boolean finished = false;
1758 final long endTime = System.currentTimeMillis() + timeout;
1759 while (!finished) {
1760 try {
1761 data = ZKUtil.getData(zkw, znode);
1762 } catch(KeeperException e) {
1763 LOG.warn("Unexpected exception handling blockUntilAvailable", e);
1764 }
1765
1766 if (data == null && (System.currentTimeMillis() +
1767 HConstants.SOCKET_RETRY_WAIT_MS < endTime)) {
1768 Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
1769 } else {
1770 finished = true;
1771 }
1772 }
1773
1774 return data;
1775 }
1776
1777
1778
1779
1780
1781
1782
1783
1784 public static KeeperException convert(final DeserializationException e) {
1785 KeeperException ke = new KeeperException.DataInconsistencyException();
1786 ke.initCause(e);
1787 return ke;
1788 }
1789
1790
1791
1792
1793
1794
1795 public static void logZKTree(ZooKeeperWatcher zkw, String root) {
1796 if (!LOG.isDebugEnabled()) return;
1797 LOG.debug("Current zk system:");
1798 String prefix = "|-";
1799 LOG.debug(prefix + root);
1800 try {
1801 logZKTree(zkw, root, prefix);
1802 } catch (KeeperException e) {
1803 throw new RuntimeException(e);
1804 }
1805 }
1806
1807
1808
1809
1810
1811
1812 protected static void logZKTree(ZooKeeperWatcher zkw, String root, String prefix) throws KeeperException {
1813 List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);
1814 if (children == null) return;
1815 for (String child : children) {
1816 LOG.debug(prefix + child);
1817 String node = ZKUtil.joinZNode(root.equals("/") ? "" : root, child);
1818 logZKTree(zkw, node, prefix + "---");
1819 }
1820 }
1821
1822
1823
1824
1825
1826
1827 public static byte[] positionToByteArray(final long position) {
1828 byte[] bytes = ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position)
1829 .build().toByteArray();
1830 return ProtobufUtil.prependPBMagic(bytes);
1831 }
1832
1833
1834
1835
1836
1837
1838 public static long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
1839 if (ProtobufUtil.isPBMagicPrefix(bytes)) {
1840 int pblen = ProtobufUtil.lengthOfPBMagic();
1841 ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
1842 ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
1843 ZooKeeperProtos.ReplicationHLogPosition position;
1844 try {
1845 position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
1846 } catch (InvalidProtocolBufferException e) {
1847 throw new DeserializationException(e);
1848 }
1849 return position.getPosition();
1850 } else {
1851 if (bytes.length > 0) {
1852 return Bytes.toLong(bytes);
1853 }
1854 return 0;
1855 }
1856 }
1857 }