1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import com.google.protobuf.InvalidProtocolBufferException;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.classification.InterfaceAudience;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.Abortable;
27 import org.apache.hadoop.hbase.Server;
28 import org.apache.hadoop.hbase.ServerName;
29 import org.apache.hadoop.hbase.exceptions.DeserializationException;
30 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
31 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
34 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
35 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
36 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
37 import org.apache.zookeeper.KeeperException;
38 import org.apache.zookeeper.KeeperException.ConnectionLossException;
39 import org.apache.zookeeper.KeeperException.NodeExistsException;
40 import org.apache.zookeeper.KeeperException.SessionExpiredException;
41
42 import java.io.Closeable;
43 import java.io.IOException;
44 import java.util.ArrayList;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.SortedMap;
50 import java.util.SortedSet;
51 import java.util.TreeMap;
52 import java.util.UUID;
53 import java.util.concurrent.atomic.AtomicBoolean;
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @InterfaceAudience.Private
85 public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
86 private static final Log LOG =
87 LogFactory.getLog(ReplicationZookeeper.class);
88
89
90 private final ZooKeeperWatcher zookeeper;
91
92 private Map<String, ReplicationPeer> peerClusters;
93
94 private String replicationZNode;
95
96 private String peersZNode;
97
98 private String rsZNode;
99
100 private String rsServerNameZnode;
101
102 private String replicationStateNodeName;
103
104
105 private String peerStateNodeName;
106 private final Configuration conf;
107
108 private String ourClusterKey;
109
110 private Abortable abortable;
111 private final ReplicationStateInterface replicationState;
112 private final ReplicationQueues replicationQueues;
113
114
115
116
117
118 public static final byte[] ENABLED_ZNODE_BYTES =
119 toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
120
121
122
123
124 static final byte[] DISABLED_ZNODE_BYTES =
125 toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
126
127
128
129
130
131
132
133 public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
134 final ZooKeeperWatcher zk) throws KeeperException {
135 super(zk, conf, abortable);
136 this.conf = conf;
137 this.zookeeper = zk;
138 setZNodes(abortable);
139 this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
140
141
142 this.replicationQueues = null;
143 }
144
145
146
147
148
149
150
151
152
153 public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
154 throws IOException, KeeperException {
155 super(server.getZooKeeper(), server.getConfiguration(), server);
156 this.abortable = server;
157 this.zookeeper = server.getZooKeeper();
158 this.conf = server.getConfiguration();
159 setZNodes(server);
160
161 this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
162 this.peerClusters = new HashMap<String, ReplicationPeer>();
163 ZKUtil.createWithParents(this.zookeeper,
164 ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
165 this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
166 ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
167 this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
168 this.replicationQueues.init(server.getServerName().toString());
169 connectExistingPeers();
170 }
171
172 private void setZNodes(Abortable abortable) throws KeeperException {
173 String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
174 String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
175 this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
176 this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
177 String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
178 this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
179 this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
180 this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
181 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
182 this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
183 ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
184 }
185
186 private void connectExistingPeers() throws IOException, KeeperException {
187 List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
188 if (znodes != null) {
189 for (String z : znodes) {
190 connectToPeer(z);
191 }
192 }
193 }
194
195
196
197
198
199 public List<String> listPeersIdsAndWatch() {
200 List<String> ids = null;
201 try {
202 ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
203 } catch (KeeperException e) {
204 this.abortable.abort("Cannot get the list of peers ", e);
205 }
206 return ids;
207 }
208
209
210
211
212
213 public Map<String,String> listPeers() {
214 Map<String,String> peers = new TreeMap<String,String>();
215 List<String> ids = null;
216 try {
217 ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
218 for (String id : ids) {
219 byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
220 String clusterKey = null;
221 try {
222 clusterKey = parsePeerFrom(bytes);
223 } catch (DeserializationException de) {
224 LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
225 continue;
226 }
227 peers.put(id, clusterKey);
228 }
229 } catch (KeeperException e) {
230 this.abortable.abort("Cannot get the list of peers ", e);
231 }
232 return peers;
233 }
234
235
236
237
238
239
240
241 public List<ServerName> getSlavesAddresses(String peerClusterId) {
242 if (this.peerClusters.size() == 0) {
243 return Collections.emptyList();
244 }
245 ReplicationPeer peer = this.peerClusters.get(peerClusterId);
246 if (peer == null) {
247 return Collections.emptyList();
248 }
249
250 List<ServerName> addresses;
251 try {
252 addresses = fetchSlavesAddresses(peer.getZkw());
253 } catch (KeeperException ke) {
254 reconnectPeer(ke, peer);
255 addresses = Collections.emptyList();
256 }
257 peer.setRegionServers(addresses);
258 return peer.getRegionServers();
259 }
260
261
262
263
264
265
266
267 private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
268 throws KeeperException {
269 return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
270 }
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289 public static List<ServerName> listChildrenAndGetAsServerNames(
290 ZooKeeperWatcher zkw, String znode)
291 throws KeeperException {
292 List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
293 if(children == null) {
294 return Collections.emptyList();
295 }
296 List<ServerName> addresses = new ArrayList<ServerName>(children.size());
297 for (String child : children) {
298 addresses.add(ServerName.parseServerName(child));
299 }
300 return addresses;
301 }
302
303
304
305
306
307
308
309 public boolean connectToPeer(String peerId)
310 throws IOException, KeeperException {
311 if (peerClusters == null) {
312 return false;
313 }
314 if (this.peerClusters.containsKey(peerId)) {
315 return false;
316 }
317 ReplicationPeer peer = getPeer(peerId);
318 if (peer == null) {
319 return false;
320 }
321 this.peerClusters.put(peerId, peer);
322 ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
323 this.rsServerNameZnode, peerId));
324 LOG.info("Added new peer cluster " + peer.getClusterKey());
325 return true;
326 }
327
328
329
330
331
332
333
334
335 public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
336 String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
337 byte [] data = ZKUtil.getData(this.zookeeper, znode);
338 String otherClusterKey = "";
339 try {
340 otherClusterKey = parsePeerFrom(data);
341 } catch (DeserializationException e) {
342 LOG.warn("Failed parse of cluster key from peerId=" + peerId
343 + ", specifically the content from the following znode: " + znode);
344 }
345 if (this.ourClusterKey.equals(otherClusterKey)) {
346 LOG.debug("Not connecting to " + peerId + " because it's us");
347 return null;
348 }
349
350 Configuration otherConf = new Configuration(this.conf);
351 try {
352 ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
353 } catch (IOException e) {
354 LOG.error("Can't get peer because:", e);
355 return null;
356 }
357
358 ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
359 otherClusterKey);
360 peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
361 return peer;
362 }
363
364
365
366
367
368
369
370 public void removePeer(String id) throws IOException {
371 try {
372 if (!peerExists(id)) {
373 throw new IllegalArgumentException("Cannot remove inexisting peer");
374 }
375 ZKUtil.deleteNodeRecursively(this.zookeeper,
376 ZKUtil.joinZNode(this.peersZNode, id));
377 } catch (KeeperException e) {
378 throw new IOException("Unable to remove a peer", e);
379 }
380 }
381
382
383
384
385
386
387
388
389
390 public void addPeer(String id, String clusterKey) throws IOException {
391 try {
392 if (peerExists(id)) {
393 throw new IllegalArgumentException("Cannot add existing peer");
394 }
395 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
396 ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
397 toByteArray(clusterKey));
398
399
400
401 ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
402 ENABLED_ZNODE_BYTES);
403
404 } catch (KeeperException e) {
405 throw new IOException("Unable to add peer", e);
406 }
407 }
408
409
410
411
412
413
414
415 static byte[] toByteArray(final String clusterKey) {
416 byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
417 .toByteArray();
418 return ProtobufUtil.prependPBMagic(bytes);
419 }
420
421
422
423
424
425
426
427
428
429
430 static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
431 byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
432 .toByteArray();
433 return ProtobufUtil.prependPBMagic(bytes);
434 }
435
436
437
438
439
440
441
442 public static byte[] positionToByteArray(
443 final long position) {
444 return ZKUtil.positionToByteArray(position);
445 }
446
447
448
449
450
451
452
453 static byte[] lockToByteArray(
454 final String lockOwner) {
455 byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
456 .toByteArray();
457 return ProtobufUtil.prependPBMagic(bytes);
458 }
459
460
461
462
463
464
465 static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
466 if (ProtobufUtil.isPBMagicPrefix(bytes)) {
467 int pblen = ProtobufUtil.lengthOfPBMagic();
468 ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
469 .newBuilder();
470 ZooKeeperProtos.ReplicationPeer peer;
471 try {
472 peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
473 } catch (InvalidProtocolBufferException e) {
474 throw new DeserializationException(e);
475 }
476 return peer.getClusterkey();
477 } else {
478 if (bytes.length > 0) {
479 return Bytes.toString(bytes);
480 }
481 return "";
482 }
483 }
484
485
486
487
488
489
490 static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
491 throws DeserializationException {
492 ProtobufUtil.expectPBMagicPrefix(bytes);
493 int pblen = ProtobufUtil.lengthOfPBMagic();
494 ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
495 .newBuilder();
496 ZooKeeperProtos.ReplicationState state;
497 try {
498 state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
499 return state.getState();
500 } catch (InvalidProtocolBufferException e) {
501 throw new DeserializationException(e);
502 }
503 }
504
505
506
507
508
509
510 public static long parseHLogPositionFrom(
511 final byte[] bytes) throws DeserializationException {
512 return ZKUtil.parseHLogPositionFrom(bytes);
513 }
514
515
516
517
518
519
520 static String parseLockOwnerFrom(
521 final byte[] bytes) throws DeserializationException {
522 if (ProtobufUtil.isPBMagicPrefix(bytes)) {
523 int pblen = ProtobufUtil.lengthOfPBMagic();
524 ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
525 .newBuilder();
526 ZooKeeperProtos.ReplicationLock lock;
527 try {
528 lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
529 } catch (InvalidProtocolBufferException e) {
530 throw new DeserializationException(e);
531 }
532 return lock.getLockOwner();
533 } else {
534 if (bytes.length > 0) {
535 return Bytes.toString(bytes);
536 }
537 return "";
538 }
539 }
540
541
542
543
544
545
546
547
548 public void enablePeer(String id) throws IOException {
549 changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
550 LOG.info("peer " + id + " is enabled");
551 }
552
553
554
555
556
557
558
559
560 public void disablePeer(String id) throws IOException {
561 changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
562 LOG.info("peer " + id + " is disabled");
563 }
564
565 private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
566 throws IOException {
567 try {
568 if (!peerExists(id)) {
569 throw new IllegalArgumentException("peer " + id + " is not registered");
570 }
571 String peerStateZNode = getPeerStateNode(id);
572 byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
573 : DISABLED_ZNODE_BYTES;
574 if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
575 ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
576 } else {
577 ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
578 }
579 LOG.info("state of the peer " + id + " changed to " + state.name());
580 } catch (KeeperException e) {
581 throw new IOException("Unable to change state of the peer " + id, e);
582 }
583 }
584
585
586
587
588
589
590
591
592
593
594 public boolean getPeerEnabled(String id) {
595 if (!this.peerClusters.containsKey(id)) {
596 throw new IllegalArgumentException("peer " + id + " is not registered");
597 }
598 return this.peerClusters.get(id).getPeerEnabled().get();
599 }
600
601 private String getPeerStateNode(String id) {
602 return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
603 }
604
605
606
607
608
609
610
611 public boolean getReplication() throws KeeperException {
612 return this.replicationState.getState();
613 }
614
615
616
617
618
619
620 public void setReplication(boolean newState) throws KeeperException {
621 this.replicationState.setState(newState);
622 }
623
624
625
626
627
628
629 public void addLogToList(String filename, String peerId) throws KeeperException {
630 this.replicationQueues.addLog(peerId, filename);
631 }
632
633
634
635
636
637
638 public void removeLogFromList(String filename, String clusterId) {
639 this.replicationQueues.removeLog(clusterId, filename);
640 }
641
642
643
644
645
646
647
648 public void writeReplicationStatus(String filename, String clusterId, long position) {
649 this.replicationQueues.setLogPosition(clusterId, filename, position);
650 }
651
652
653
654
655
656
657 public List<String> getRegisteredRegionServers() {
658 List<String> result = null;
659 try {
660 result = ZKUtil.listChildrenAndWatchThem(
661 this.zookeeper, this.zookeeper.rsZNode);
662 } catch (KeeperException e) {
663 this.abortable.abort("Get list of registered region servers", e);
664 }
665 return result;
666 }
667
668
669
670
671
672
673
674
675 public SortedMap<String, SortedSet<String>> claimQueues(String regionserver) {
676 return this.replicationQueues.claimQueues(regionserver);
677 }
678
679
680
681
682
683 public void deleteSource(String peerZnode, boolean closeConnection) {
684 this.replicationQueues.removeQueue(peerZnode);
685 if (closeConnection) {
686 this.peerClusters.get(peerZnode).getZkw().close();
687 this.peerClusters.remove(peerZnode);
688 }
689 }
690
691
692
693
694 public void deleteOwnRSZNode() {
695 this.replicationQueues.removeAllQueues();
696 }
697
698
699
700
701
702
703
704
705 public long getHLogRepPosition(String peerId, String hlog) throws KeeperException {
706 return this.replicationQueues.getLogPosition(peerId, hlog);
707 }
708
709
710
711
712
713
714
715
716 public UUID getPeerUUID(String peerId) {
717 ReplicationPeer peer = getPeerClusters().get(peerId);
718 UUID peerUUID = null;
719 try {
720 peerUUID = getUUIDForCluster(peer.getZkw());
721 } catch (KeeperException ke) {
722 reconnectPeer(ke, peer);
723 }
724 return peerUUID;
725 }
726
727
728
729
730
731
732
733 public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
734 return UUID.fromString(ZKClusterId.readClusterIdZNode(zkw));
735 }
736
737 private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
738 if (ke instanceof ConnectionLossException
739 || ke instanceof SessionExpiredException) {
740 LOG.warn(
741 "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
742 ke);
743 try {
744 peer.reloadZkWatcher();
745 } catch(IOException io) {
746 LOG.warn(
747 "Creation of ZookeeperWatcher failed for peer "
748 + peer.getClusterKey(), io);
749 }
750 }
751 }
752
753 public void registerRegionServerListener(ZooKeeperListener listener) {
754 this.zookeeper.registerListener(listener);
755 }
756
757
758
759
760
761 public Map<String, ReplicationPeer> getPeerClusters() {
762 return this.peerClusters;
763 }
764
765
766
767
768
769
770 public boolean isPeerPath(String path) {
771 return path.split("/").length == peersZNode.split("/").length + 1;
772 }
773
774
775
776
777
778
779 public static String getZNodeName(String fullPath) {
780 String[] parts = fullPath.split("/");
781 return parts.length > 0 ? parts[parts.length-1] : "";
782 }
783
784
785
786
787
788 public ZooKeeperWatcher getZookeeperWatcher() {
789 return this.zookeeper;
790 }
791
792
793
794
795
796
797 public String getPeersZNode() {
798 return peersZNode;
799 }
800
801 @Override
802 public void close() throws IOException {
803 if (replicationState != null) replicationState.close();
804 }
805
806
807
808
809
810
811
812
813
814
815 static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
816 throws NodeExistsException, KeeperException {
817 if (ZKUtil.checkExists(zookeeper, path) == -1) {
818
819
820
821 ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
822 return true;
823 }
824 return false;
825 }
826
827
828
829
830
831
832
833 static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
834 ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
835 return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
836 }
837 }