001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
026import org.apache.hadoop.hbase.exceptions.DeserializationException;
027import org.apache.hadoop.hbase.zookeeper.ZKUtil;
028import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
029import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
030import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.zookeeper.KeeperException;
033
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
035
036/**
037 * ZK based replication peer storage.
038 */
039@InterfaceAudience.Private
040public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
041  implements ReplicationPeerStorage {
042
043  public static final String PEERS_ZNODE = "zookeeper.znode.replication.peers";
044  public static final String PEERS_ZNODE_DEFAULT = "peers";
045
046  public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state";
047  public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
048
049  public static final byte[] ENABLED_ZNODE_BYTES =
050    toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
051  public static final byte[] DISABLED_ZNODE_BYTES =
052    toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
053
054  public static final String SYNC_REPLICATION_STATE_ZNODE = "sync-rep-state";
055
056  public static final String NEW_SYNC_REPLICATION_STATE_ZNODE = "new-sync-rep-state";
057
058  public static final byte[] NONE_STATE_ZNODE_BYTES =
059    SyncReplicationState.toByteArray(SyncReplicationState.NONE);
060
061  /**
062   * The name of the znode that contains the replication status of a remote slave (i.e. peer)
063   * cluster.
064   */
065  private final String peerStateNodeName;
066
067  /**
068   * The name of the znode that contains a list of all remote slave (i.e. peer) clusters.
069   */
070  private final String peersZNode;
071
072  public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) {
073    super(zookeeper, conf);
074    this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, PEERS_STATE_ZNODE_DEFAULT);
075    String peersZNodeName = conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT);
076    this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
077  }
078
079  public String getPeerStateNode(String peerId) {
080    return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName);
081  }
082
083  public String getPeerNode(String peerId) {
084    return ZNodePaths.joinZNode(peersZNode, peerId);
085  }
086
087  public String getSyncReplicationStateNode(String peerId) {
088    return ZNodePaths.joinZNode(getPeerNode(peerId), SYNC_REPLICATION_STATE_ZNODE);
089  }
090
091  public String getNewSyncReplicationStateNode(String peerId) {
092    return ZNodePaths.joinZNode(getPeerNode(peerId), NEW_SYNC_REPLICATION_STATE_ZNODE);
093  }
094
095  @Override
096  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled,
097    SyncReplicationState syncReplicationState) throws ReplicationException {
098    List<ZKUtilOp> multiOps = Arrays.asList(
099      ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
100        ReplicationPeerConfigUtil.toByteArray(peerConfig)),
101      ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
102        enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES),
103      ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId),
104        SyncReplicationState.toByteArray(syncReplicationState)),
105      ZKUtilOp.createAndFailSilent(getNewSyncReplicationStateNode(peerId), NONE_STATE_ZNODE_BYTES));
106    try {
107      ZKUtil.createWithParents(zookeeper, peersZNode);
108      ZKUtil.multiOrSequential(zookeeper, multiOps, false);
109    } catch (KeeperException e) {
110      throw new ReplicationException(
111        "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state="
112          + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState,
113        e);
114    }
115  }
116
117  @Override
118  public void removePeer(String peerId) throws ReplicationException {
119    try {
120      ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId));
121    } catch (KeeperException e) {
122      throw new ReplicationException("Could not remove peer with id=" + peerId, e);
123    }
124  }
125
126  @Override
127  public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
128    byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
129    try {
130      ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
131    } catch (KeeperException e) {
132      throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e);
133    }
134  }
135
136  @Override
137  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
138    throws ReplicationException {
139    try {
140      ZKUtil.setData(this.zookeeper, getPeerNode(peerId),
141        ReplicationPeerConfigUtil.toByteArray(peerConfig));
142    } catch (KeeperException e) {
143      throw new ReplicationException(
144        "There was a problem trying to save changes to the " + "replication peer " + peerId, e);
145    }
146  }
147
148  @Override
149  public List<String> listPeerIds() throws ReplicationException {
150    try {
151      List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, peersZNode);
152      return children != null ? children : Collections.emptyList();
153    } catch (KeeperException e) {
154      throw new ReplicationException("Cannot get the list of peers", e);
155    }
156  }
157
158  @Override
159  public boolean isPeerEnabled(String peerId) throws ReplicationException {
160    try {
161      return Arrays.equals(ENABLED_ZNODE_BYTES,
162        ZKUtil.getData(zookeeper, getPeerStateNode(peerId)));
163    } catch (KeeperException | InterruptedException e) {
164      throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
165    }
166  }
167
168  @Override
169  public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
170    byte[] data;
171    try {
172      data = ZKUtil.getData(zookeeper, getPeerNode(peerId));
173    } catch (KeeperException | InterruptedException e) {
174      throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e);
175    }
176    if (data == null || data.length == 0) {
177      throw new ReplicationException(
178        "Replication peer config data shouldn't be empty, peerId=" + peerId);
179    }
180    try {
181      return ReplicationPeerConfigUtil.parsePeerFrom(data);
182    } catch (DeserializationException e) {
183      throw new ReplicationException(
184        "Failed to parse replication peer config for peer with id=" + peerId, e);
185    }
186  }
187
188  @Override
189  public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
190    throws ReplicationException {
191    try {
192      ZKUtil.createSetData(zookeeper, getNewSyncReplicationStateNode(peerId),
193        SyncReplicationState.toByteArray(state));
194    } catch (KeeperException e) {
195      throw new ReplicationException(
196        "Unable to set the new sync replication state for peer with id=" + peerId, e);
197    }
198  }
199
200  @Override
201  public void transitPeerSyncReplicationState(String peerId) throws ReplicationException {
202    String newStateNode = getNewSyncReplicationStateNode(peerId);
203    try {
204      byte[] data = ZKUtil.getData(zookeeper, newStateNode);
205      ZKUtil.multiOrSequential(zookeeper,
206        Arrays.asList(ZKUtilOp.setData(newStateNode, NONE_STATE_ZNODE_BYTES),
207          ZKUtilOp.setData(getSyncReplicationStateNode(peerId), data)),
208        false);
209    } catch (KeeperException | InterruptedException e) {
210      throw new ReplicationException(
211        "Error transiting sync replication state for peer with id=" + peerId, e);
212    }
213  }
214
215  private SyncReplicationState getSyncReplicationState(String peerId, String path)
216    throws ReplicationException {
217    try {
218      byte[] data = ZKUtil.getData(zookeeper, path);
219      if (data == null || data.length == 0) {
220        if (ZKUtil.checkExists(zookeeper, getPeerNode(peerId)) != -1) {
221          // should be a peer from previous version, set the sync replication state for it.
222          ZKUtil.createSetData(zookeeper, path, NONE_STATE_ZNODE_BYTES);
223          return SyncReplicationState.NONE;
224        } else {
225          throw new ReplicationException(
226            "Replication peer sync state shouldn't be empty, peerId=" + peerId);
227        }
228      }
229      return SyncReplicationState.parseFrom(data);
230    } catch (KeeperException | InterruptedException | IOException e) {
231      throw new ReplicationException(
232        "Error getting sync replication state of path " + path + " for peer with id=" + peerId, e);
233    }
234  }
235
236  @Override
237  public SyncReplicationState getPeerNewSyncReplicationState(String peerId)
238    throws ReplicationException {
239    return getSyncReplicationState(peerId, getNewSyncReplicationStateNode(peerId));
240  }
241
242  @Override
243  public SyncReplicationState getPeerSyncReplicationState(String peerId)
244    throws ReplicationException {
245    return getSyncReplicationState(peerId, getSyncReplicationStateNode(peerId));
246  }
247}