1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import com.google.protobuf.InvalidProtocolBufferException;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.Abortable;
26 import org.apache.hadoop.hbase.exceptions.DeserializationException;
27 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
28 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
29 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
30 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
31 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32 import org.apache.zookeeper.KeeperException;
33
34 import java.io.IOException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37
38
39
40
41 public class ReplicationStateImpl extends ReplicationStateZKBase implements
42 ReplicationStateInterface {
43
44 private final ReplicationStateTracker stateTracker;
45 private final AtomicBoolean replicating;
46
47 private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
48
49 public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
50 final Abortable abortable, final AtomicBoolean replicating) {
51 super(zk, conf, abortable);
52 this.replicating = replicating;
53
54
55 this.stateTracker =
56 new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
57 stateTracker.start();
58 readReplicationStateZnode();
59 }
60
61 public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
62 final Abortable abortable) {
63 this(zk, conf, abortable, new AtomicBoolean());
64 }
65
66 @Override
67 public boolean getState() throws KeeperException {
68 return getReplication();
69 }
70
71 @Override
72 public void setState(boolean newState) throws KeeperException {
73 setReplicating(newState);
74 }
75
76 @Override
77 public void close() throws IOException {
78 if (stateTracker != null) stateTracker.stop();
79 }
80
81
82
83
84
85
86
87 private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
88 ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
89 return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
90 }
91
92
93
94
95
96
97 private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
98 throws DeserializationException {
99 ProtobufUtil.expectPBMagicPrefix(bytes);
100 int pblen = ProtobufUtil.lengthOfPBMagic();
101 ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
102 .newBuilder();
103 ZooKeeperProtos.ReplicationState state;
104 try {
105 state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
106 return state.getState();
107 } catch (InvalidProtocolBufferException e) {
108 throw new DeserializationException(e);
109 }
110 }
111
112
113
114
115
116 private void setReplicating(boolean newState) throws KeeperException {
117 ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
118 byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
119 : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
120 ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
121 }
122
123
124
125
126
127
128
129 private boolean getReplication() throws KeeperException {
130 byte[] data = this.stateTracker.getData(false);
131 if (data == null || data.length == 0) {
132 setReplicating(true);
133 return true;
134 }
135 try {
136 return isStateEnabled(data);
137 } catch (DeserializationException e) {
138 throw ZKUtil.convert(e);
139 }
140 }
141
142
143
144
145 private void readReplicationStateZnode() {
146 try {
147 this.replicating.set(getReplication());
148 LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
149 } catch (KeeperException e) {
150 this.abortable.abort("Failed getting data on from " + this.stateZNode, e);
151 }
152 }
153
154
155
156
157 private class ReplicationStateTracker extends ZooKeeperNodeTracker {
158 public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
159 super(watcher, stateZnode, abortable);
160 }
161
162 @Override
163 public synchronized void nodeDataChanged(String path) {
164 if (path.equals(node)) {
165 super.nodeDataChanged(path);
166 readReplicationStateZnode();
167 }
168 }
169 }
170 }