1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.TreeMap;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.Abortable;
35  import org.apache.hadoop.hbase.CompoundConfiguration;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.exceptions.DeserializationException;
38  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
39  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
40  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
41  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
42  import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.Pair;
45  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
46  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
47  import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
48  import org.apache.zookeeper.KeeperException;
49  
50  import com.google.protobuf.ByteString;
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  
72  
73  
74  
75  
76  
77  
78  @InterfaceAudience.Private
79  public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
80  
81    
82    private Map<String, ReplicationPeerZKImpl> peerClusters;
83    private final String tableCFsNodeName;
84  
85    private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
86  
87    public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
88        Abortable abortable) {
89      super(zk, conf, abortable);
90      this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
91      this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
92    }
93  
94    @Override
95    public void init() throws ReplicationException {
96      try {
97        if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
98          ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
99        }
100     } catch (KeeperException e) {
101       throw new ReplicationException("Could not initialize replication peers", e);
102     }
103     addExistingPeers();
104   }
105 
106   @Override
107   public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
108       throws ReplicationException {
109     try {
110       if (peerExists(id)) {
111         throw new IllegalArgumentException("Cannot add a peer with id=" + id
112             + " because that id already exists.");
113       }
114 
115       if(id.contains("-")){
116         throw new IllegalArgumentException("Found invalid peer name:" + id);
117       }
118 
119       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
120       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
121       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
122         toByteArray(peerConfig));
123       
124       
125       
126       
127       ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
128       String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
129       ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
130       listOfOps.add(op1);
131       listOfOps.add(op2);
132       listOfOps.add(op3);
133       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
134       
135     } catch (KeeperException e) {
136       throw new ReplicationException("Could not add peer with id=" + id
137           + ", peerConfif=>" + peerConfig, e);
138     }
139   }
140 
141   @Override
142   public void removePeer(String id) throws ReplicationException {
143     try {
144       if (!peerExists(id)) {
145         throw new IllegalArgumentException("Cannot remove peer with id=" + id
146             + " because that id does not exist.");
147       }
148       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
149     } catch (KeeperException e) {
150       throw new ReplicationException("Could not remove peer with id=" + id, e);
151     }
152   }
153 
154   @Override
155   public void enablePeer(String id) throws ReplicationException {
156     changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
157     LOG.info("peer " + id + " is enabled");
158   }
159 
160   @Override
161   public void disablePeer(String id) throws ReplicationException {
162     changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
163     LOG.info("peer " + id + " is disabled");
164   }
165 
166   @Override
167   public String getPeerTableCFsConfig(String id) throws ReplicationException {
168     try {
169       if (!peerExists(id)) {
170         throw new IllegalArgumentException("peer " + id + " doesn't exist");
171       }
172       try {
173         return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
174       } catch (Exception e) {
175         throw new ReplicationException(e);
176       }
177     } catch (KeeperException e) {
178       throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
179     }
180   }
181 
182   @Override
183   public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
184     try {
185       if (!peerExists(id)) {
186         throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
187             + " does not exist.");
188       }
189       String tableCFsZKNode = getTableCFsNode(id);
190       byte[] tableCFs = Bytes.toBytes(tableCFsStr);
191       if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
192         ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
193       } else {
194         ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
195       }
196       LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
197     } catch (KeeperException e) {
198       throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
199     }
200   }
201 
202   @Override
203   public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException {
204     ReplicationPeer replicationPeer = this.peerClusters.get(id);
205     if (replicationPeer == null) {
206       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
207     }
208     return replicationPeer.getTableCFs();
209   }
210 
211   @Override
212   public boolean getStatusOfPeer(String id) {
213     ReplicationPeer replicationPeer = this.peerClusters.get(id);
214     if (replicationPeer == null) {
215       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
216     }
217     return replicationPeer.getPeerState() == PeerState.ENABLED;
218   }
219 
220   @Override
221   public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
222     try {
223       if (!peerExists(id)) {
224         throw new IllegalArgumentException("peer " + id + " doesn't exist");
225       }
226       String peerStateZNode = getPeerStateNode(id);
227       try {
228         return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
229       } catch (KeeperException e) {
230         throw new ReplicationException(e);
231       } catch (DeserializationException e) {
232         throw new ReplicationException(e);
233       }
234     } catch (KeeperException e) {
235       throw new ReplicationException("Unable to get status of the peer with id=" + id +
236           " from backing store", e);
237     } catch (InterruptedException e) {
238       throw new ReplicationException(e);
239     }
240   }
241 
242   @Override
243   public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
244     Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
245     List<String> ids = null;
246     try {
247       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
248       for (String id : ids) {
249         ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
250         if (peerConfig == null) {
251           LOG.warn("Failed to get replication peer configuration of clusterid=" + id
252             + " znode content, continuing.");
253           continue;
254         }
255         peers.put(id, peerConfig);
256       }
257     } catch (KeeperException e) {
258       this.abortable.abort("Cannot get the list of peers ", e);
259     } catch (ReplicationException e) {
260       this.abortable.abort("Cannot get the list of peers ", e);
261     }
262     return peers;
263   }
264 
265   @Override
266   public ReplicationPeer getPeer(String peerId) {
267     return peerClusters.get(peerId);
268   }
269 
270   @Override
271   public Set<String> getPeerIds() {
272     return peerClusters.keySet(); 
273   }
274 
275   
276 
277 
278   @Override
279   public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
280       throws ReplicationException {
281     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
282     byte[] data = null;
283     try {
284       data = ZKUtil.getData(this.zookeeper, znode);
285     } catch (InterruptedException e) {
286       LOG.warn("Could not get configuration for peer because the thread " +
287           "was interrupted. peerId=" + peerId);
288       Thread.currentThread().interrupt();
289       return null;
290     } catch (KeeperException e) {
291       throw new ReplicationException("Error getting configuration for peer with id="
292           + peerId, e);
293     }
294     if (data == null) {
295       LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
296       return null;
297     }
298 
299     try {
300       return parsePeerFrom(data);
301     } catch (DeserializationException e) {
302       LOG.warn("Failed to parse cluster key from peerId=" + peerId
303           + ", specifically the content from the following znode: " + znode);
304       return null;
305     }
306   }
307 
308   @Override
309   public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
310       throws ReplicationException {
311     ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
312 
313     if (peerConfig == null) {
314       return null;
315     }
316 
317     Configuration otherConf = new Configuration(this.conf);
318     try {
319       if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
320         ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
321       }
322     } catch (IOException e) {
323       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
324       return null;
325     }
326 
327     if (!peerConfig.getConfiguration().isEmpty()) {
328       CompoundConfiguration compound = new CompoundConfiguration();
329       compound.add(otherConf);
330       compound.addStringMap(peerConfig.getConfiguration());
331       return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
332     }
333 
334     return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
335   }
336 
337   
338 
339 
340   @Override
341   public List<String> getAllPeerIds() {
342     List<String> ids = null;
343     try {
344       ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
345     } catch (KeeperException e) {
346       this.abortable.abort("Cannot get the list of peers ", e);
347     }
348     return ids;
349   }
350 
351   
352 
353 
354 
355   private void addExistingPeers() throws ReplicationException {
356     List<String> znodes = null;
357     try {
358       znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
359     } catch (KeeperException e) {
360       throw new ReplicationException("Error getting the list of peer clusters.", e);
361     }
362     if (znodes != null) {
363       for (String z : znodes) {
364         createAndAddPeer(z);
365       }
366     }
367   }
368 
369   @Override
370   public boolean peerAdded(String peerId) throws ReplicationException {
371     return createAndAddPeer(peerId);
372   }
373 
374   @Override
375   public void peerRemoved(String peerId) {
376     ReplicationPeer rp = this.peerClusters.get(peerId);
377     if (rp != null) {
378       ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
379     }
380   }
381 
382   
383 
384 
385 
386 
387   public boolean createAndAddPeer(String peerId) throws ReplicationException {
388     if (peerClusters == null) {
389       return false;
390     }
391     if (this.peerClusters.containsKey(peerId)) {
392       return false;
393     }
394 
395     ReplicationPeerZKImpl peer = null;
396     try {
397       peer = createPeer(peerId);
398     } catch (Exception e) {
399       throw new ReplicationException("Error adding peer with id=" + peerId, e);
400     }
401     if (peer == null) {
402       return false;
403     }
404     ReplicationPeerZKImpl previous =
405       ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
406     if (previous == null) {
407       LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
408     } else {
409       LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
410         ", new cluster=" + peer.getPeerConfig().getClusterKey());
411     }
412     return true;
413   }
414 
415   private String getTableCFsNode(String id) {
416     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
417   }
418 
419   private String getPeerStateNode(String id) {
420     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
421   }
422 
423   
424 
425 
426 
427 
428   private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
429       throws ReplicationException {
430     try {
431       if (!peerExists(id)) {
432         throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
433             + " does not exist.");
434       }
435       String peerStateZNode = getPeerStateNode(id);
436       byte[] stateBytes =
437           (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
438               : DISABLED_ZNODE_BYTES;
439       if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
440         ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
441       } else {
442         ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
443       }
444       LOG.info("Peer with id= " + id + " is now " + state.name());
445     } catch (KeeperException e) {
446       throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
447     }
448   }
449 
450   
451 
452 
453 
454 
455 
456   private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
457     Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
458     if (pair == null) {
459       return null;
460     }
461     Configuration peerConf = pair.getSecond();
462 
463     ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
464     try {
465       peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
466     } catch (KeeperException e) {
467       throw new ReplicationException("Error starting the peer state tracker for peerId=" +
468           peerId, e);
469     }
470 
471     try {
472       peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
473     } catch (KeeperException e) {
474       throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
475           peerId, e);
476     }
477 
478     return peer;
479   }
480 
481   
482 
483 
484 
485 
486   private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
487       throws DeserializationException {
488     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
489       int pblen = ProtobufUtil.lengthOfPBMagic();
490       ZooKeeperProtos.ReplicationPeer.Builder builder =
491           ZooKeeperProtos.ReplicationPeer.newBuilder();
492       ZooKeeperProtos.ReplicationPeer peer;
493       try {
494         ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
495         peer = builder.build();
496       } catch (IOException e) {
497         throw new DeserializationException(e);
498       }
499       return convert(peer);
500     } else {
501       if (bytes.length > 0) {
502         return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
503       }
504       return new ReplicationPeerConfig().setClusterKey("");
505     }
506   }
507 
508   private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
509     ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
510     if (peer.hasClusterkey()) {
511       peerConfig.setClusterKey(peer.getClusterkey());
512     }
513     if (peer.hasReplicationEndpointImpl()) {
514       peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
515     }
516 
517     for (BytesBytesPair pair : peer.getDataList()) {
518       peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
519     }
520 
521     for (NameStringPair pair : peer.getConfigurationList()) {
522       peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
523     }
524     return peerConfig;
525   }
526 
527   private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig  peerConfig) {
528     ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
529     if (peerConfig.getClusterKey() != null) {
530       builder.setClusterkey(peerConfig.getClusterKey());
531     }
532     if (peerConfig.getReplicationEndpointImpl() != null) {
533       builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
534     }
535 
536     for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
537       builder.addData(BytesBytesPair.newBuilder()
538         .setFirst(ByteString.copyFrom(entry.getKey()))
539         .setSecond(ByteString.copyFrom(entry.getValue()))
540           .build());
541     }
542 
543     for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
544       builder.addConfiguration(NameStringPair.newBuilder()
545         .setName(entry.getKey())
546         .setValue(entry.getValue())
547         .build());
548     }
549 
550     return builder.build();
551   }
552 
553   
554 
555 
556 
557 
558 
559   private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
560     byte[] bytes = convert(peerConfig).toByteArray();
561     return ProtobufUtil.prependPBMagic(bytes);
562   }
563 
564 
565 }