001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.TreeMap;
028import java.util.concurrent.ConcurrentHashMap;
029import java.util.concurrent.ConcurrentMap;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.CompoundConfiguration;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
038import org.apache.hadoop.hbase.exceptions.DeserializationException;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
040import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
041import org.apache.hadoop.hbase.util.Pair;
042import org.apache.hadoop.hbase.zookeeper.ZKConfig;
043import org.apache.hadoop.hbase.zookeeper.ZKUtil;
044import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
045import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
046import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
047import org.apache.yetus.audience.InterfaceAudience;
048import org.apache.zookeeper.KeeperException;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
054 * peers znode contains a list of all peer replication clusters and the current replication state of
055 * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
056 * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
057 * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
058 * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
059 * For example:
060 *
061 *  /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
062 *  /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
063 *
064 * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
065 * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
066 * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
067 * ReplicationPeer.PeerStateTracker class. For example:
068 *
069 * /hbase/replication/peers/1/peer-state [Value: ENABLED]
070 *
071 * Each of these peer znodes has a child znode that indicates which data will be replicated
072 * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
073 * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
074 * class. For example:
075 *
076 * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
077 */
078@InterfaceAudience.Private
079public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
080
081  // Map of peer clusters keyed by their id
082  private Map<String, ReplicationPeerZKImpl> peerClusters;
083  private final ReplicationQueuesClient queuesClient;
084  private Abortable abortable;
085
086  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class);
087
088  public ReplicationPeersZKImpl(final ZKWatcher zk, final Configuration conf,
089                                final ReplicationQueuesClient queuesClient, Abortable abortable) {
090    super(zk, conf, abortable);
091    this.abortable = abortable;
092    this.peerClusters = new ConcurrentHashMap<>();
093    this.queuesClient = queuesClient;
094  }
095
096  @Override
097  public void init() throws ReplicationException {
098    try {
099      if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
100        ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
101      }
102    } catch (KeeperException e) {
103      throw new ReplicationException("Could not initialize replication peers", e);
104    }
105    addExistingPeers();
106  }
107
108  @Override
109  public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled)
110      throws ReplicationException {
111    try {
112      if (peerExists(id)) {
113        throw new IllegalArgumentException("Cannot add a peer with id=" + id
114            + " because that id already exists.");
115      }
116
117      if(id.contains("-")){
118        throw new IllegalArgumentException("Found invalid peer name:" + id);
119      }
120
121      if (peerConfig.getClusterKey() != null) {
122        try {
123          ZKConfig.validateClusterKey(peerConfig.getClusterKey());
124        } catch (IOException ioe) {
125          throw new IllegalArgumentException(ioe.getMessage());
126        }
127      }
128
129      checkQueuesDeleted(id);
130
131      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
132
133      List<ZKUtilOp> listOfOps = new ArrayList<>(2);
134      ZKUtilOp op1 =
135          ZKUtilOp.createAndFailSilent(getPeerNode(id),
136            ReplicationPeerConfigUtil.toByteArray(peerConfig));
137      ZKUtilOp op2 =
138          ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES
139              : DISABLED_ZNODE_BYTES);
140      listOfOps.add(op1);
141      listOfOps.add(op2);
142      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
143    } catch (KeeperException e) {
144      throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>"
145          + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
146    }
147  }
148
149  @Override
150  public void unregisterPeer(String id) throws ReplicationException {
151    try {
152      if (!peerExists(id)) {
153        throw new IllegalArgumentException("Cannot remove peer with id=" + id
154            + " because that id does not exist.");
155      }
156      ZKUtil.deleteNodeRecursively(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id));
157    } catch (KeeperException e) {
158      throw new ReplicationException("Could not remove peer with id=" + id, e);
159    }
160  }
161
162  @Override
163  public void enablePeer(String id) throws ReplicationException {
164    changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
165    LOG.info("peer " + id + " is enabled");
166  }
167
168  @Override
169  public void disablePeer(String id) throws ReplicationException {
170    changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
171    LOG.info("peer " + id + " is disabled");
172  }
173
174  @Override
175  public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
176    try {
177      if (!peerExists(id)) {
178        throw new IllegalArgumentException("peer " + id + " doesn't exist");
179      }
180      try {
181        ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
182        if (rpc == null) {
183          throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
184        }
185        return rpc.getTableCFsMap();
186      } catch (Exception e) {
187        throw new ReplicationException(e);
188      }
189    } catch (KeeperException e) {
190      throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
191    }
192  }
193
194  @Override
195  public void setPeerTableCFsConfig(String id,
196                                    Map<TableName, ? extends Collection<String>>  tableCFs)
197      throws ReplicationException {
198    try {
199      if (!peerExists(id)) {
200        throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
201            + " does not exist.");
202      }
203      ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
204      if (rpc == null) {
205        throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
206      }
207      rpc.setTableCFsMap(tableCFs);
208      ZKUtil.setData(this.zookeeper, getPeerNode(id),
209          ReplicationPeerConfigUtil.toByteArray(rpc));
210      LOG.info("Peer tableCFs with id= " + id + " is now " +
211        ReplicationPeerConfigUtil.convertToString(tableCFs));
212    } catch (KeeperException e) {
213      throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
214    }
215  }
216
217  @Override
218  public boolean getStatusOfPeer(String id) {
219    ReplicationPeer replicationPeer = this.peerClusters.get(id);
220    if (replicationPeer == null) {
221      throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
222    }
223    return replicationPeer.getPeerState() == PeerState.ENABLED;
224  }
225
226  @Override
227  public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
228    try {
229      if (!peerExists(id)) {
230        throw new IllegalArgumentException("peer " + id + " doesn't exist");
231      }
232      String peerStateZNode = getPeerStateNode(id);
233      try {
234        return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
235      } catch (KeeperException e) {
236        throw new ReplicationException(e);
237      } catch (DeserializationException e) {
238        throw new ReplicationException(e);
239      }
240    } catch (KeeperException e) {
241      throw new ReplicationException("Unable to get status of the peer with id=" + id +
242          " from backing store", e);
243    } catch (InterruptedException e) {
244      throw new ReplicationException(e);
245    }
246  }
247
248  @Override
249  public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
250    Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
251    List<String> ids = null;
252    try {
253      ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
254      for (String id : ids) {
255        ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
256        if (peerConfig == null) {
257          LOG.warn("Failed to get replication peer configuration of clusterid=" + id
258            + " znode content, continuing.");
259          continue;
260        }
261        peers.put(id, peerConfig);
262      }
263    } catch (KeeperException e) {
264      this.abortable.abort("Cannot get the list of peers ", e);
265    } catch (ReplicationException e) {
266      this.abortable.abort("Cannot get the list of peers ", e);
267    }
268    return peers;
269  }
270
271  @Override
272  public ReplicationPeer getConnectedPeer(String peerId) {
273    return peerClusters.get(peerId);
274  }
275
276  @Override
277  public Set<String> getConnectedPeerIds() {
278    return peerClusters.keySet(); // this is not thread-safe
279  }
280
281  /**
282   * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
283   */
284  @Override
285  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
286      throws ReplicationException {
287    String znode = getPeerNode(peerId);
288    byte[] data = null;
289    try {
290      data = ZKUtil.getData(this.zookeeper, znode);
291    } catch (InterruptedException e) {
292      LOG.warn("Could not get configuration for peer because the thread " +
293          "was interrupted. peerId=" + peerId);
294      Thread.currentThread().interrupt();
295      return null;
296    } catch (KeeperException e) {
297      throw new ReplicationException("Error getting configuration for peer with id="
298          + peerId, e);
299    }
300    if (data == null) {
301      LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
302      return null;
303    }
304
305    try {
306      return ReplicationPeerConfigUtil.parsePeerFrom(data);
307    } catch (DeserializationException e) {
308      LOG.warn("Failed to parse cluster key from peerId=" + peerId
309          + ", specifically the content from the following znode: " + znode);
310      return null;
311    }
312  }
313
314  @Override
315  public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
316      throws ReplicationException {
317    ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
318
319    if (peerConfig == null) {
320      return null;
321    }
322
323    Configuration otherConf;
324    try {
325      otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
326    } catch (IOException e) {
327      LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
328      return null;
329    }
330
331    if (!peerConfig.getConfiguration().isEmpty()) {
332      CompoundConfiguration compound = new CompoundConfiguration();
333      compound.add(otherConf);
334      compound.addStringMap(peerConfig.getConfiguration());
335      return new Pair<>(peerConfig, compound);
336    }
337
338    return new Pair<>(peerConfig, otherConf);
339  }
340
341  @Override
342  public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
343      throws ReplicationException {
344    ReplicationPeer peer = getConnectedPeer(id);
345    if (peer == null){
346      throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
347    }
348    ReplicationPeerConfig existingConfig = peer.getPeerConfig();
349    if (!isStringEquals(newConfig.getClusterKey(), existingConfig.getClusterKey())) {
350      throw new ReplicationException(
351          "Changing the cluster key on an existing peer is not allowed." + " Existing key '" +
352              existingConfig.getClusterKey() + "' does not match new key '" +
353              newConfig.getClusterKey() + "'");
354    }
355    if (!isStringEquals(newConfig.getReplicationEndpointImpl(),
356      existingConfig.getReplicationEndpointImpl())) {
357      throw new ReplicationException("Changing the replication endpoint implementation class " +
358          "on an existing peer is not allowed. Existing class '" +
359          existingConfig.getReplicationEndpointImpl() + "' does not match new class '" +
360          newConfig.getReplicationEndpointImpl() + "'");
361    }
362
363    // Update existingConfig's peer config and peer data with the new values, but don't touch config
364    // or data that weren't explicitly changed
365    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig);
366    builder.putAllConfiguration(newConfig.getConfiguration())
367        .putAllPeerData(newConfig.getPeerData())
368        .setReplicateAllUserTables(newConfig.replicateAllUserTables())
369        .setNamespaces(newConfig.getNamespaces()).setTableCFsMap(newConfig.getTableCFsMap())
370        .setExcludeNamespaces(newConfig.getExcludeNamespaces())
371        .setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap())
372        .setBandwidth(newConfig.getBandwidth());
373
374    try {
375      ZKUtil.setData(this.zookeeper, getPeerNode(id),
376          ReplicationPeerConfigUtil.toByteArray(builder.build()));
377    }
378    catch(KeeperException ke){
379      throw new ReplicationException("There was a problem trying to save changes to the " +
380          "replication peer " + id, ke);
381    }
382  }
383
384  /**
385   * List all registered peer clusters and set a watch on their znodes.
386   */
387  @Override
388  public List<String> getAllPeerIds() {
389    List<String> ids = null;
390    try {
391      ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
392    } catch (KeeperException e) {
393      this.abortable.abort("Cannot get the list of peers ", e);
394    }
395    return ids;
396  }
397
398  /**
399   * A private method used during initialization. This method attempts to add all registered
400   * peer clusters. This method does not set a watch on the peer cluster znodes.
401   */
402  private void addExistingPeers() throws ReplicationException {
403    List<String> znodes = null;
404    try {
405      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
406    } catch (KeeperException e) {
407      throw new ReplicationException("Error getting the list of peer clusters.", e);
408    }
409    if (znodes != null) {
410      for (String z : znodes) {
411        createAndAddPeer(z);
412      }
413    }
414  }
415
416  @Override
417  public boolean peerConnected(String peerId) throws ReplicationException {
418    return createAndAddPeer(peerId);
419  }
420
421  @Override
422  public void peerDisconnected(String peerId) {
423    ReplicationPeer rp = this.peerClusters.get(peerId);
424    if (rp != null) {
425      ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
426    }
427  }
428
429  /**
430   * Attempt to connect to a new remote slave cluster.
431   * @param peerId a short that identifies the cluster
432   * @return true if a new connection was made, false if no new connection was made.
433   */
434  public boolean createAndAddPeer(String peerId) throws ReplicationException {
435    if (peerClusters == null) {
436      return false;
437    }
438    if (this.peerClusters.containsKey(peerId)) {
439      return false;
440    }
441
442    ReplicationPeerZKImpl peer = null;
443    try {
444      peer = createPeer(peerId);
445    } catch (Exception e) {
446      throw new ReplicationException("Error adding peer with id=" + peerId, e);
447    }
448    if (peer == null) {
449      return false;
450    }
451    ReplicationPeerZKImpl previous =
452      ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
453    if (previous == null) {
454      LOG.info("Added peer cluster=" + peer.getPeerConfig().getClusterKey());
455    } else {
456      LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
457        ", new cluster=" + peer.getPeerConfig().getClusterKey());
458    }
459    return true;
460  }
461
462  /**
463   * Update the state znode of a peer cluster.
464   * @param id
465   * @param state
466   */
467  private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
468      throws ReplicationException {
469    try {
470      if (!peerExists(id)) {
471        throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
472            + " does not exist.");
473      }
474      String peerStateZNode = getPeerStateNode(id);
475      byte[] stateBytes =
476          (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
477              : DISABLED_ZNODE_BYTES;
478      if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
479        ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
480      } else {
481        ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
482      }
483      LOG.info("Peer with id= " + id + " is now " + state.name());
484    } catch (KeeperException e) {
485      throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
486    }
487  }
488
489  /**
490   * Helper method to connect to a peer
491   * @param peerId peer's identifier
492   * @return object representing the peer
493   * @throws ReplicationException
494   */
495  private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
496    Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
497    if (pair == null) {
498      return null;
499    }
500    Configuration peerConf = pair.getSecond();
501
502    ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper,
503        peerConf, peerId, pair.getFirst(), abortable);
504    try {
505      peer.startStateTracker(this.getPeerStateNode(peerId));
506    } catch (KeeperException e) {
507      throw new ReplicationException("Error starting the peer state tracker for peerId=" +
508          peerId, e);
509    }
510
511    try {
512      peer.startPeerConfigTracker(this.getPeerNode(peerId));
513    } catch (KeeperException e) {
514      throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
515          peerId, e);
516    }
517
518    return peer;
519  }
520
521  private void checkQueuesDeleted(String peerId) throws ReplicationException {
522    if (queuesClient == null) return;
523    try {
524      List<String> replicators = queuesClient.getListOfReplicators();
525      if (replicators == null || replicators.isEmpty()) {
526        return;
527      }
528      for (String replicator : replicators) {
529        List<String> queueIds = queuesClient.getAllQueues(replicator);
530        for (String queueId : queueIds) {
531          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
532          if (queueInfo.getPeerId().equals(peerId)) {
533            throw new ReplicationException("undeleted queue for peerId: " + peerId
534                + ", replicator: " + replicator + ", queueId: " + queueId);
535          }
536        }
537      }
538      // Check for hfile-refs queue
539      if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
540          && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
541        throw new ReplicationException("Undeleted queue for peerId: " + peerId
542            + ", found in hfile-refs node path " + hfileRefsZNode);
543      }
544    } catch (KeeperException e) {
545      throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
546    }
547  }
548
549  /**
550   * For replication peer cluster key or endpoint class, null and empty string is same. So here
551   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
552   */
553  private boolean isStringEquals(String s1, String s2) {
554    if (StringUtils.isBlank(s1)) {
555      return StringUtils.isBlank(s2);
556    }
557    return s1.equals(s2);
558  }
559}