1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.Abortable;
35 import org.apache.hadoop.hbase.CompoundConfiguration;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.exceptions.DeserializationException;
38 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
40 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
41 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
42 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.Pair;
45 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
46 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
47 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
48 import org.apache.zookeeper.KeeperException;
49
50 import com.google.protobuf.ByteString;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @InterfaceAudience.Private
79 public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
80
81
82 private Map<String, ReplicationPeerZKImpl> peerClusters;
83 private final String tableCFsNodeName;
84
85 private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
86
87 public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
88 Abortable abortable) {
89 super(zk, conf, abortable);
90 this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
91 this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
92 }
93
94 @Override
95 public void init() throws ReplicationException {
96 try {
97 if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
98 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
99 }
100 } catch (KeeperException e) {
101 throw new ReplicationException("Could not initialize replication peers", e);
102 }
103 addExistingPeers();
104 }
105
106 @Override
107 public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
108 throws ReplicationException {
109 try {
110 if (peerExists(id)) {
111 throw new IllegalArgumentException("Cannot add a peer with id=" + id
112 + " because that id already exists.");
113 }
114
115 if(id.contains("-")){
116 throw new IllegalArgumentException("Found invalid peer name:" + id);
117 }
118
119 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
120 List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
121 ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
122 toByteArray(peerConfig));
123
124
125
126
127 ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
128 String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
129 ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
130 listOfOps.add(op1);
131 listOfOps.add(op2);
132 listOfOps.add(op3);
133 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
134
135 } catch (KeeperException e) {
136 throw new ReplicationException("Could not add peer with id=" + id
137 + ", peerConfif=>" + peerConfig, e);
138 }
139 }
140
141 @Override
142 public void removePeer(String id) throws ReplicationException {
143 try {
144 if (!peerExists(id)) {
145 throw new IllegalArgumentException("Cannot remove peer with id=" + id
146 + " because that id does not exist.");
147 }
148 ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
149 } catch (KeeperException e) {
150 throw new ReplicationException("Could not remove peer with id=" + id, e);
151 }
152 }
153
154 @Override
155 public void enablePeer(String id) throws ReplicationException {
156 changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
157 LOG.info("peer " + id + " is enabled");
158 }
159
160 @Override
161 public void disablePeer(String id) throws ReplicationException {
162 changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
163 LOG.info("peer " + id + " is disabled");
164 }
165
166 @Override
167 public String getPeerTableCFsConfig(String id) throws ReplicationException {
168 try {
169 if (!peerExists(id)) {
170 throw new IllegalArgumentException("peer " + id + " doesn't exist");
171 }
172 try {
173 return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
174 } catch (Exception e) {
175 throw new ReplicationException(e);
176 }
177 } catch (KeeperException e) {
178 throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
179 }
180 }
181
182 @Override
183 public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
184 try {
185 if (!peerExists(id)) {
186 throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
187 + " does not exist.");
188 }
189 String tableCFsZKNode = getTableCFsNode(id);
190 byte[] tableCFs = Bytes.toBytes(tableCFsStr);
191 if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
192 ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
193 } else {
194 ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
195 }
196 LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
197 } catch (KeeperException e) {
198 throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
199 }
200 }
201
202 @Override
203 public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException {
204 ReplicationPeer replicationPeer = this.peerClusters.get(id);
205 if (replicationPeer == null) {
206 throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
207 }
208 return replicationPeer.getTableCFs();
209 }
210
211 @Override
212 public boolean getStatusOfPeer(String id) {
213 ReplicationPeer replicationPeer = this.peerClusters.get(id);
214 if (replicationPeer == null) {
215 throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
216 }
217 return replicationPeer.getPeerState() == PeerState.ENABLED;
218 }
219
220 @Override
221 public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
222 try {
223 if (!peerExists(id)) {
224 throw new IllegalArgumentException("peer " + id + " doesn't exist");
225 }
226 String peerStateZNode = getPeerStateNode(id);
227 try {
228 return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
229 } catch (KeeperException e) {
230 throw new ReplicationException(e);
231 } catch (DeserializationException e) {
232 throw new ReplicationException(e);
233 }
234 } catch (KeeperException e) {
235 throw new ReplicationException("Unable to get status of the peer with id=" + id +
236 " from backing store", e);
237 } catch (InterruptedException e) {
238 throw new ReplicationException(e);
239 }
240 }
241
242 @Override
243 public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
244 Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
245 List<String> ids = null;
246 try {
247 ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
248 for (String id : ids) {
249 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
250 if (peerConfig == null) {
251 LOG.warn("Failed to get replication peer configuration of clusterid=" + id
252 + " znode content, continuing.");
253 continue;
254 }
255 peers.put(id, peerConfig);
256 }
257 } catch (KeeperException e) {
258 this.abortable.abort("Cannot get the list of peers ", e);
259 } catch (ReplicationException e) {
260 this.abortable.abort("Cannot get the list of peers ", e);
261 }
262 return peers;
263 }
264
265 @Override
266 public ReplicationPeer getPeer(String peerId) {
267 return peerClusters.get(peerId);
268 }
269
270 @Override
271 public Set<String> getPeerIds() {
272 return peerClusters.keySet();
273 }
274
275
276
277
278 @Override
279 public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
280 throws ReplicationException {
281 String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
282 byte[] data = null;
283 try {
284 data = ZKUtil.getData(this.zookeeper, znode);
285 } catch (InterruptedException e) {
286 LOG.warn("Could not get configuration for peer because the thread " +
287 "was interrupted. peerId=" + peerId);
288 Thread.currentThread().interrupt();
289 return null;
290 } catch (KeeperException e) {
291 throw new ReplicationException("Error getting configuration for peer with id="
292 + peerId, e);
293 }
294 if (data == null) {
295 LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
296 return null;
297 }
298
299 try {
300 return parsePeerFrom(data);
301 } catch (DeserializationException e) {
302 LOG.warn("Failed to parse cluster key from peerId=" + peerId
303 + ", specifically the content from the following znode: " + znode);
304 return null;
305 }
306 }
307
308 @Override
309 public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
310 throws ReplicationException {
311 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
312
313 if (peerConfig == null) {
314 return null;
315 }
316
317 Configuration otherConf = new Configuration(this.conf);
318 try {
319 if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
320 ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
321 }
322 } catch (IOException e) {
323 LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
324 return null;
325 }
326
327 if (!peerConfig.getConfiguration().isEmpty()) {
328 CompoundConfiguration compound = new CompoundConfiguration();
329 compound.add(otherConf);
330 compound.addStringMap(peerConfig.getConfiguration());
331 return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
332 }
333
334 return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
335 }
336
337
338
339
340 @Override
341 public List<String> getAllPeerIds() {
342 List<String> ids = null;
343 try {
344 ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
345 } catch (KeeperException e) {
346 this.abortable.abort("Cannot get the list of peers ", e);
347 }
348 return ids;
349 }
350
351
352
353
354
355 private void addExistingPeers() throws ReplicationException {
356 List<String> znodes = null;
357 try {
358 znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
359 } catch (KeeperException e) {
360 throw new ReplicationException("Error getting the list of peer clusters.", e);
361 }
362 if (znodes != null) {
363 for (String z : znodes) {
364 createAndAddPeer(z);
365 }
366 }
367 }
368
369 @Override
370 public boolean peerAdded(String peerId) throws ReplicationException {
371 return createAndAddPeer(peerId);
372 }
373
374 @Override
375 public void peerRemoved(String peerId) {
376 ReplicationPeer rp = this.peerClusters.get(peerId);
377 if (rp != null) {
378 ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
379 }
380 }
381
382
383
384
385
386
387 public boolean createAndAddPeer(String peerId) throws ReplicationException {
388 if (peerClusters == null) {
389 return false;
390 }
391 if (this.peerClusters.containsKey(peerId)) {
392 return false;
393 }
394
395 ReplicationPeerZKImpl peer = null;
396 try {
397 peer = createPeer(peerId);
398 } catch (Exception e) {
399 throw new ReplicationException("Error adding peer with id=" + peerId, e);
400 }
401 if (peer == null) {
402 return false;
403 }
404 ReplicationPeerZKImpl previous =
405 ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
406 if (previous == null) {
407 LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
408 } else {
409 LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
410 ", new cluster=" + peer.getPeerConfig().getClusterKey());
411 }
412 return true;
413 }
414
415 private String getTableCFsNode(String id) {
416 return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
417 }
418
419 private String getPeerStateNode(String id) {
420 return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
421 }
422
423
424
425
426
427
428 private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
429 throws ReplicationException {
430 try {
431 if (!peerExists(id)) {
432 throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
433 + " does not exist.");
434 }
435 String peerStateZNode = getPeerStateNode(id);
436 byte[] stateBytes =
437 (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
438 : DISABLED_ZNODE_BYTES;
439 if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
440 ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
441 } else {
442 ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
443 }
444 LOG.info("Peer with id= " + id + " is now " + state.name());
445 } catch (KeeperException e) {
446 throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
447 }
448 }
449
450
451
452
453
454
455
456 private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
457 Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
458 if (pair == null) {
459 return null;
460 }
461 Configuration peerConf = pair.getSecond();
462
463 ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
464 try {
465 peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
466 } catch (KeeperException e) {
467 throw new ReplicationException("Error starting the peer state tracker for peerId=" +
468 peerId, e);
469 }
470
471 try {
472 peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
473 } catch (KeeperException e) {
474 throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
475 peerId, e);
476 }
477
478 return peer;
479 }
480
481
482
483
484
485
486 private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
487 throws DeserializationException {
488 if (ProtobufUtil.isPBMagicPrefix(bytes)) {
489 int pblen = ProtobufUtil.lengthOfPBMagic();
490 ZooKeeperProtos.ReplicationPeer.Builder builder =
491 ZooKeeperProtos.ReplicationPeer.newBuilder();
492 ZooKeeperProtos.ReplicationPeer peer;
493 try {
494 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
495 peer = builder.build();
496 } catch (IOException e) {
497 throw new DeserializationException(e);
498 }
499 return convert(peer);
500 } else {
501 if (bytes.length > 0) {
502 return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
503 }
504 return new ReplicationPeerConfig().setClusterKey("");
505 }
506 }
507
508 private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
509 ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
510 if (peer.hasClusterkey()) {
511 peerConfig.setClusterKey(peer.getClusterkey());
512 }
513 if (peer.hasReplicationEndpointImpl()) {
514 peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
515 }
516
517 for (BytesBytesPair pair : peer.getDataList()) {
518 peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
519 }
520
521 for (NameStringPair pair : peer.getConfigurationList()) {
522 peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
523 }
524 return peerConfig;
525 }
526
527 private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
528 ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
529 if (peerConfig.getClusterKey() != null) {
530 builder.setClusterkey(peerConfig.getClusterKey());
531 }
532 if (peerConfig.getReplicationEndpointImpl() != null) {
533 builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
534 }
535
536 for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
537 builder.addData(BytesBytesPair.newBuilder()
538 .setFirst(ByteString.copyFrom(entry.getKey()))
539 .setSecond(ByteString.copyFrom(entry.getValue()))
540 .build());
541 }
542
543 for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
544 builder.addConfiguration(NameStringPair.newBuilder()
545 .setName(entry.getKey())
546 .setValue(entry.getValue())
547 .build());
548 }
549
550 return builder.build();
551 }
552
553
554
555
556
557
558
559 private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
560 byte[] bytes = convert(peerConfig).toByteArray();
561 return ProtobufUtil.prependPBMagic(bytes);
562 }
563
564
565 }