View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.TreeMap;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.HBaseConfiguration;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Abortable;
36  import org.apache.hadoop.hbase.CompoundConfiguration;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.exceptions.DeserializationException;
39  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
41  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
42  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
43  import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.Pair;
46  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48  import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
49  import org.apache.zookeeper.KeeperException;
50  
51  import com.google.protobuf.ByteString;
52  
53  /**
54   * This class provides an implementation of the ReplicationPeers interface using Zookeeper. The
55   * peers znode contains a list of all peer replication clusters and the current replication state of
56   * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
57   * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
58   * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
59   * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
60   * For example:
61   *
62   *  /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
63   *  /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
64   *
65   * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
66   * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
67   * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
68   * ReplicationPeer.PeerStateTracker class. For example:
69   *
70   * /hbase/replication/peers/1/peer-state [Value: ENABLED]
71   *
72   * Each of these peer znodes has a child znode that indicates which data will be replicated
73   * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
74   * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
75   * class. For example:
76   *
77   * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
78   */
79  @InterfaceAudience.Private
80  public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
81  
82    // Map of peer clusters keyed by their id
83    private Map<String, ReplicationPeerZKImpl> peerClusters;
84    private final String tableCFsNodeName;
85  
86    private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
87  
88    public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
89        Abortable abortable) {
90      super(zk, conf, abortable);
91      this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
92      this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
93    }
94  
95    @Override
96    public void init() throws ReplicationException {
97      try {
98        if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
99          ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
100       }
101     } catch (KeeperException e) {
102       throw new ReplicationException("Could not initialize replication peers", e);
103     }
104     addExistingPeers();
105   }
106 
107   @Override
108   public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
109       throws ReplicationException {
110     try {
111       if (peerExists(id)) {
112         throw new IllegalArgumentException("Cannot add a peer with id=" + id
113             + " because that id already exists.");
114       }
115 
116       if(id.contains("-") || id.equals(ReplicationQueuesZKImpl.RS_LOCK_ZNODE)) {
117         throw new IllegalArgumentException("Found invalid peer name:" + id);
118       }
119 
120       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
121       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
122       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
123         toByteArray(peerConfig));
124       // There is a race (if hbase.zookeeper.useMulti is false)
125       // b/w PeerWatcher and ReplicationZookeeper#add method to create the
126       // peer-state znode. This happens while adding a peer
127       // The peer state data is set as "ENABLED" by default.
128       ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
129       String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
130       ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
131       listOfOps.add(op1);
132       listOfOps.add(op2);
133       listOfOps.add(op3);
134       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
135       // A peer is enabled by default
136     } catch (KeeperException e) {
137       throw new ReplicationException("Could not add peer with id=" + id
138           + ", peerConfif=>" + peerConfig, e);
139     }
140   }
141 
142   @Override
143   public void removePeer(String id) throws ReplicationException {
144     try {
145       if (!peerExists(id)) {
146         throw new IllegalArgumentException("Cannot remove peer with id=" + id
147             + " because that id does not exist.");
148       }
149       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
150     } catch (KeeperException e) {
151       throw new ReplicationException("Could not remove peer with id=" + id, e);
152     }
153   }
154 
155   @Override
156   public void enablePeer(String id) throws ReplicationException {
157     changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
158     LOG.info("peer " + id + " is enabled");
159   }
160 
161   @Override
162   public void disablePeer(String id) throws ReplicationException {
163     changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
164     LOG.info("peer " + id + " is disabled");
165   }
166 
167   @Override
168   public String getPeerTableCFsConfig(String id) throws ReplicationException {
169     try {
170       if (!peerExists(id)) {
171         throw new IllegalArgumentException("peer " + id + " doesn't exist");
172       }
173       try {
174         return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
175       } catch (Exception e) {
176         throw new ReplicationException(e);
177       }
178     } catch (KeeperException e) {
179       throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
180     }
181   }
182 
183   @Override
184   public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
185     try {
186       if (!peerExists(id)) {
187         throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
188             + " does not exist.");
189       }
190       String tableCFsZKNode = getTableCFsNode(id);
191       byte[] tableCFs = Bytes.toBytes(tableCFsStr);
192       if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
193         ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
194       } else {
195         ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
196       }
197       LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
198     } catch (KeeperException e) {
199       throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
200     }
201   }
202 
203   @Override
204   public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException {
205     ReplicationPeer replicationPeer = this.peerClusters.get(id);
206     if (replicationPeer == null) {
207       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
208     }
209     return replicationPeer.getTableCFs();
210   }
211 
212   @Override
213   public boolean getStatusOfPeer(String id) {
214     ReplicationPeer replicationPeer = this.peerClusters.get(id);
215     if (replicationPeer == null) {
216       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
217     }
218     return replicationPeer.getPeerState() == PeerState.ENABLED;
219   }
220 
221   @Override
222   public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
223     try {
224       if (!peerExists(id)) {
225         throw new IllegalArgumentException("peer " + id + " doesn't exist");
226       }
227       String peerStateZNode = getPeerStateNode(id);
228       try {
229         return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
230       } catch (KeeperException e) {
231         throw new ReplicationException(e);
232       } catch (DeserializationException e) {
233         throw new ReplicationException(e);
234       }
235     } catch (KeeperException e) {
236       throw new ReplicationException("Unable to get status of the peer with id=" + id +
237           " from backing store", e);
238     } catch (InterruptedException e) {
239       throw new ReplicationException(e);
240     }
241   }
242 
243   @Override
244   public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
245     Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
246     List<String> ids = null;
247     try {
248       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
249       for (String id : ids) {
250         ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
251         if (peerConfig == null) {
252           LOG.warn("Failed to get replication peer configuration of clusterid=" + id
253             + " znode content, continuing.");
254           continue;
255         }
256         peers.put(id, peerConfig);
257       }
258     } catch (KeeperException e) {
259       this.abortable.abort("Cannot get the list of peers ", e);
260     } catch (ReplicationException e) {
261       this.abortable.abort("Cannot get the list of peers ", e);
262     }
263     return peers;
264   }
265 
266   @Override
267   public ReplicationPeer getPeer(String peerId) {
268     return peerClusters.get(peerId);
269   }
270 
271   @Override
272   public Set<String> getPeerIds() {
273     return peerClusters.keySet(); // this is not thread-safe
274   }
275 
276   /**
277    * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
278    */
279   @Override
280   public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
281       throws ReplicationException {
282     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
283     byte[] data = null;
284     try {
285       data = ZKUtil.getData(this.zookeeper, znode);
286     } catch (InterruptedException e) {
287       LOG.warn("Could not get configuration for peer because the thread " +
288           "was interrupted. peerId=" + peerId);
289       Thread.currentThread().interrupt();
290       return null;
291     } catch (KeeperException e) {
292       throw new ReplicationException("Error getting configuration for peer with id="
293           + peerId, e);
294     }
295     if (data == null) {
296       LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
297       return null;
298     }
299 
300     try {
301       return parsePeerFrom(data);
302     } catch (DeserializationException e) {
303       LOG.warn("Failed to parse cluster key from peerId=" + peerId
304           + ", specifically the content from the following znode: " + znode);
305       return null;
306     }
307   }
308 
309   @Override
310   public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
311       throws ReplicationException {
312     ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
313 
314     if (peerConfig == null) {
315       return null;
316     }
317 
318     Configuration otherConf;
319     try {
320       otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
321     } catch (IOException e) {
322       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
323       return null;
324     }
325 
326     if (!peerConfig.getConfiguration().isEmpty()) {
327       CompoundConfiguration compound = new CompoundConfiguration();
328       compound.add(otherConf);
329       compound.addStringMap(peerConfig.getConfiguration());
330       return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
331     }
332 
333     return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
334   }
335 
336   /**
337    * List all registered peer clusters and set a watch on their znodes.
338    */
339   @Override
340   public List<String> getAllPeerIds() {
341     List<String> ids = null;
342     try {
343       ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
344     } catch (KeeperException e) {
345       this.abortable.abort("Cannot get the list of peers ", e);
346     }
347     return ids;
348   }
349 
350   /**
351    * A private method used during initialization. This method attempts to add all registered
352    * peer clusters. This method does not set a watch on the peer cluster znodes.
353    */
354   private void addExistingPeers() throws ReplicationException {
355     List<String> znodes = null;
356     try {
357       znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
358     } catch (KeeperException e) {
359       throw new ReplicationException("Error getting the list of peer clusters.", e);
360     }
361     if (znodes != null) {
362       for (String z : znodes) {
363         createAndAddPeer(z);
364       }
365     }
366   }
367 
368   @Override
369   public boolean peerAdded(String peerId) throws ReplicationException {
370     return createAndAddPeer(peerId);
371   }
372 
373   @Override
374   public void peerRemoved(String peerId) {
375     ReplicationPeer rp = this.peerClusters.get(peerId);
376     if (rp != null) {
377       ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
378     }
379   }
380 
381   /**
382    * Attempt to connect to a new remote slave cluster.
383    * @param peerId a short that identifies the cluster
384    * @return true if a new connection was made, false if no new connection was made.
385    */
386   public boolean createAndAddPeer(String peerId) throws ReplicationException {
387     if (peerClusters == null) {
388       return false;
389     }
390     if (this.peerClusters.containsKey(peerId)) {
391       return false;
392     }
393 
394     ReplicationPeerZKImpl peer = null;
395     try {
396       peer = createPeer(peerId);
397     } catch (Exception e) {
398       throw new ReplicationException("Error adding peer with id=" + peerId, e);
399     }
400     if (peer == null) {
401       return false;
402     }
403     ReplicationPeerZKImpl previous =
404       ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
405     if (previous == null) {
406       LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
407     } else {
408       LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
409         ", new cluster=" + peer.getPeerConfig().getClusterKey());
410     }
411     return true;
412   }
413 
414   private String getTableCFsNode(String id) {
415     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
416   }
417 
418   private String getPeerStateNode(String id) {
419     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
420   }
421 
422   /**
423    * Update the state znode of a peer cluster.
424    * @param id
425    * @param state
426    */
427   private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
428       throws ReplicationException {
429     try {
430       if (!peerExists(id)) {
431         throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
432             + " does not exist.");
433       }
434       String peerStateZNode = getPeerStateNode(id);
435       byte[] stateBytes =
436           (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
437               : DISABLED_ZNODE_BYTES;
438       if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
439         ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
440       } else {
441         ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
442       }
443       LOG.info("Peer with id= " + id + " is now " + state.name());
444     } catch (KeeperException e) {
445       throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
446     }
447   }
448 
449   /**
450    * Helper method to connect to a peer
451    * @param peerId peer's identifier
452    * @return object representing the peer
453    * @throws ReplicationException
454    */
455   private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
456     Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
457     if (pair == null) {
458       return null;
459     }
460     Configuration peerConf = pair.getSecond();
461 
462     ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
463     try {
464       peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
465     } catch (KeeperException e) {
466       throw new ReplicationException("Error starting the peer state tracker for peerId=" +
467           peerId, e);
468     }
469 
470     try {
471       peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
472     } catch (KeeperException e) {
473       throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
474           peerId, e);
475     }
476 
477     return peer;
478   }
479 
480   /**
481    * @param bytes Content of a peer znode.
482    * @return ClusterKey parsed from the passed bytes.
483    * @throws DeserializationException
484    */
485   private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
486       throws DeserializationException {
487     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
488       int pblen = ProtobufUtil.lengthOfPBMagic();
489       ZooKeeperProtos.ReplicationPeer.Builder builder =
490           ZooKeeperProtos.ReplicationPeer.newBuilder();
491       ZooKeeperProtos.ReplicationPeer peer;
492       try {
493         ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
494         peer = builder.build();
495       } catch (IOException e) {
496         throw new DeserializationException(e);
497       }
498       return convert(peer);
499     } else {
500       if (bytes.length > 0) {
501         return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
502       }
503       return new ReplicationPeerConfig().setClusterKey("");
504     }
505   }
506 
507   private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
508     ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
509     if (peer.hasClusterkey()) {
510       peerConfig.setClusterKey(peer.getClusterkey());
511     }
512     if (peer.hasReplicationEndpointImpl()) {
513       peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
514     }
515 
516     for (BytesBytesPair pair : peer.getDataList()) {
517       peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
518     }
519 
520     for (NameStringPair pair : peer.getConfigurationList()) {
521       peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
522     }
523     return peerConfig;
524   }
525 
526   private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig  peerConfig) {
527     ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
528     if (peerConfig.getClusterKey() != null) {
529       builder.setClusterkey(peerConfig.getClusterKey());
530     }
531     if (peerConfig.getReplicationEndpointImpl() != null) {
532       builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
533     }
534 
535     for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
536       builder.addData(BytesBytesPair.newBuilder()
537         .setFirst(ByteString.copyFrom(entry.getKey()))
538         .setSecond(ByteString.copyFrom(entry.getValue()))
539           .build());
540     }
541 
542     for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
543       builder.addConfiguration(NameStringPair.newBuilder()
544         .setName(entry.getKey())
545         .setValue(entry.getValue())
546         .build());
547     }
548 
549     return builder.build();
550   }
551 
552   /**
553    * @param peerConfig
554    * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
555    *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
556    *         /hbase/replication/peers/PEER_ID
557    */
558   private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
559     byte[] bytes = convert(peerConfig).toByteArray();
560     return ProtobufUtil.prependPBMagic(bytes);
561   }
562 
563 }