1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Abortable;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
34 import org.apache.hadoop.hbase.exceptions.DeserializationException;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.apache.zookeeper.KeeperException;
42 import org.apache.zookeeper.KeeperException.NodeExistsException;
43
44 @InterfaceAudience.Private
45 public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
46 private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
47
48 private final ReplicationPeerConfig peerConfig;
49 private final String id;
50 private volatile PeerState peerState;
51 private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
52 private final Configuration conf;
53
54 private PeerStateTracker peerStateTracker;
55 private TableCFsTracker tableCFsTracker;
56
57
58
59
60
61
62
63
64 public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
65 throws ReplicationException {
66 this.conf = conf;
67 this.peerConfig = peerConfig;
68 this.id = id;
69 }
70
71
72
73
74
75
76
77
78
79 public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
80 Map<TableName, List<String>> tableCFs) throws ReplicationException {
81 this.conf = conf;
82 this.peerConfig = peerConfig;
83 this.id = id;
84 this.tableCFs = tableCFs;
85 }
86
87
88
89
90
91
92
93
94 public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
95 throws KeeperException {
96 ensurePeerEnabled(zookeeper, peerStateNode);
97 this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
98 this.peerStateTracker.start();
99 try {
100 this.readPeerStateZnode();
101 } catch (DeserializationException e) {
102 throw ZKUtil.convert(e);
103 }
104 }
105
106 private void readPeerStateZnode() throws DeserializationException {
107 this.peerState =
108 isStateEnabled(this.peerStateTracker.getData(false))
109 ? PeerState.ENABLED
110 : PeerState.DISABLED;
111 }
112
113
114
115
116
117
118
119
120 public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
121 throws KeeperException {
122 this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
123 this);
124 this.tableCFsTracker.start();
125 this.readTableCFsZnode();
126 }
127
128 private void readTableCFsZnode() {
129 String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
130 this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
131 }
132
133 @Override
134 public PeerState getPeerState() {
135 return peerState;
136 }
137
138
139
140
141
142 @Override
143 public String getId() {
144 return id;
145 }
146
147
148
149
150
151 @Override
152 public ReplicationPeerConfig getPeerConfig() {
153 return peerConfig;
154 }
155
156
157
158
159
160 @Override
161 public Configuration getConfiguration() {
162 return conf;
163 }
164
165
166
167
168
169 @Override
170 public Map<TableName, List<String>> getTableCFs() {
171 return this.tableCFs;
172 }
173
174 @Override
175 public void abort(String why, Throwable e) {
176 LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
177 + " was aborted for the following reason(s):" + why, e);
178 }
179
180 @Override
181 public boolean isAborted() {
182
183
184 return false;
185 }
186
187 @Override
188 public void close() throws IOException {
189
190 }
191
192
193
194
195
196
197
198 public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
199 ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
200 return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
201 }
202
203
204
205
206
207
208 private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
209 throws DeserializationException {
210 ProtobufUtil.expectPBMagicPrefix(bytes);
211 int pblen = ProtobufUtil.lengthOfPBMagic();
212 ZooKeeperProtos.ReplicationState.Builder builder =
213 ZooKeeperProtos.ReplicationState.newBuilder();
214 ZooKeeperProtos.ReplicationState state;
215 try {
216 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
217 state = builder.build();
218 return state.getState();
219 } catch (IOException e) {
220 throw new DeserializationException(e);
221 }
222 }
223
224
225
226
227
228
229
230
231
232 private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
233 throws NodeExistsException, KeeperException {
234 if (ZKUtil.checkExists(zookeeper, path) == -1) {
235
236
237
238 ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
239 ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
240 return true;
241 }
242 return false;
243 }
244
245
246
247
248 public class PeerStateTracker extends ZooKeeperNodeTracker {
249
250 public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
251 Abortable abortable) {
252 super(watcher, peerStateZNode, abortable);
253 }
254
255 @Override
256 public synchronized void nodeDataChanged(String path) {
257 if (path.equals(node)) {
258 super.nodeDataChanged(path);
259 try {
260 readPeerStateZnode();
261 } catch (DeserializationException e) {
262 LOG.warn("Failed deserializing the content of " + path, e);
263 }
264 }
265 }
266 }
267
268
269
270
271 public class TableCFsTracker extends ZooKeeperNodeTracker {
272
273 public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
274 Abortable abortable) {
275 super(watcher, tableCFsZNode, abortable);
276 }
277
278 @Override
279 public synchronized void nodeCreated(String path) {
280 if (path.equals(node)) {
281 super.nodeCreated(path);
282 readTableCFsZnode();
283 }
284 }
285
286 @Override
287 public synchronized void nodeDataChanged(String path) {
288 if (path.equals(node)) {
289 super.nodeDataChanged(path);
290 }
291 }
292 }
293 }