001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.Arrays;
021import java.util.Collections;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
025import org.apache.hadoop.hbase.exceptions.DeserializationException;
026import org.apache.hadoop.hbase.zookeeper.ZKUtil;
027import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
028import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
029import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.zookeeper.KeeperException;
032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
033
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
035
036/**
037 * ZK based replication peer storage.
038 */
039@InterfaceAudience.Private
040public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
041    implements ReplicationPeerStorage {
042
043  public static final String PEERS_ZNODE = "zookeeper.znode.replication.peers";
044  public static final String PEERS_ZNODE_DEFAULT = "peers";
045
046  public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state";
047  public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
048
049  public static final byte[] ENABLED_ZNODE_BYTES =
050    toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
051  public static final byte[] DISABLED_ZNODE_BYTES =
052    toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
053
054  /**
055   * The name of the znode that contains the replication status of a remote slave (i.e. peer)
056   * cluster.
057   */
058  private final String peerStateNodeName;
059
060  /**
061   * The name of the znode that contains a list of all remote slave (i.e. peer) clusters.
062   */
063  private final String peersZNode;
064
065  public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) {
066    super(zookeeper, conf);
067    this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, PEERS_STATE_ZNODE_DEFAULT);
068    String peersZNodeName = conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT);
069    this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
070  }
071
072  @VisibleForTesting
073  public String getPeerStateNode(String peerId) {
074    return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName);
075  }
076
077  @VisibleForTesting
078  public String getPeerNode(String peerId) {
079    return ZNodePaths.joinZNode(peersZNode, peerId);
080  }
081
082  @Override
083  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
084      throws ReplicationException {
085    try {
086      ZKUtil.createWithParents(zookeeper, peersZNode);
087      ZKUtil.multiOrSequential(zookeeper,
088        Arrays.asList(
089          ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
090            ReplicationPeerConfigUtil.toByteArray(peerConfig)),
091          ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
092            enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
093        false);
094    } catch (KeeperException e) {
095      throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>"
096          + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
097    }
098  }
099
100  @Override
101  public void removePeer(String peerId) throws ReplicationException {
102    try {
103      ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId));
104    } catch (KeeperException e) {
105      throw new ReplicationException("Could not remove peer with id=" + peerId, e);
106    }
107  }
108
109  @Override
110  public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
111    byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
112    try {
113      ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
114    } catch (KeeperException e) {
115      throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e);
116    }
117  }
118
119  @Override
120  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
121      throws ReplicationException {
122    try {
123      ZKUtil.setData(this.zookeeper, getPeerNode(peerId),
124        ReplicationPeerConfigUtil.toByteArray(peerConfig));
125    } catch (KeeperException e) {
126      throw new ReplicationException(
127          "There was a problem trying to save changes to the " + "replication peer " + peerId, e);
128    }
129  }
130
131  @Override
132  public List<String> listPeerIds() throws ReplicationException {
133    try {
134      List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, peersZNode);
135      return children != null ? children : Collections.emptyList();
136    } catch (KeeperException e) {
137      throw new ReplicationException("Cannot get the list of peers", e);
138    }
139  }
140
141  @Override
142  public boolean isPeerEnabled(String peerId) throws ReplicationException {
143    try {
144      return Arrays.equals(ENABLED_ZNODE_BYTES,
145        ZKUtil.getData(zookeeper, getPeerStateNode(peerId)));
146    } catch (KeeperException | InterruptedException e) {
147      throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
148    }
149  }
150
151  @Override
152  public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
153    byte[] data;
154    try {
155      data = ZKUtil.getData(zookeeper, getPeerNode(peerId));
156    } catch (KeeperException | InterruptedException e) {
157      throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
158    }
159    if (data == null || data.length == 0) {
160      throw new ReplicationException(
161          "Replication peer config data shouldn't be empty, peerId=" + peerId);
162    }
163    try {
164      return ReplicationPeerConfigUtil.parsePeerFrom(data);
165    } catch (DeserializationException e) {
166      throw new ReplicationException(
167          "Failed to parse replication peer config for peer with id=" + peerId, e);
168    }
169  }
170}