001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.Collections; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 026import org.apache.hadoop.hbase.exceptions.DeserializationException; 027import org.apache.hadoop.hbase.zookeeper.ZKUtil; 028import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; 029import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 030import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.zookeeper.KeeperException; 033 034import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 035 036/** 037 * ZK based replication peer storage. 038 */ 039@InterfaceAudience.Private 040public class ZKReplicationPeerStorage extends ZKReplicationStorageBase 041 implements ReplicationPeerStorage { 042 043 public static final String PEERS_ZNODE = "zookeeper.znode.replication.peers"; 044 public static final String PEERS_ZNODE_DEFAULT = "peers"; 045 046 public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state"; 047 public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state"; 048 049 public static final byte[] ENABLED_ZNODE_BYTES = 050 toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); 051 public static final byte[] DISABLED_ZNODE_BYTES = 052 toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); 053 054 public static final String SYNC_REPLICATION_STATE_ZNODE = "sync-rep-state"; 055 056 public static final String NEW_SYNC_REPLICATION_STATE_ZNODE = "new-sync-rep-state"; 057 058 public static final byte[] NONE_STATE_ZNODE_BYTES = 059 SyncReplicationState.toByteArray(SyncReplicationState.NONE); 060 061 /** 062 * The name of the znode that contains the replication status of a remote slave (i.e. peer) 063 * cluster. 064 */ 065 private final String peerStateNodeName; 066 067 /** 068 * The name of the znode that contains a list of all remote slave (i.e. peer) clusters. 069 */ 070 private final String peersZNode; 071 072 public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { 073 super(zookeeper, conf); 074 this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, PEERS_STATE_ZNODE_DEFAULT); 075 String peersZNodeName = conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT); 076 this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); 077 } 078 079 public String getPeerStateNode(String peerId) { 080 return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName); 081 } 082 083 public String getPeerNode(String peerId) { 084 return ZNodePaths.joinZNode(peersZNode, peerId); 085 } 086 087 public String getSyncReplicationStateNode(String peerId) { 088 return ZNodePaths.joinZNode(getPeerNode(peerId), SYNC_REPLICATION_STATE_ZNODE); 089 } 090 091 public String getNewSyncReplicationStateNode(String peerId) { 092 return ZNodePaths.joinZNode(getPeerNode(peerId), NEW_SYNC_REPLICATION_STATE_ZNODE); 093 } 094 095 @Override 096 public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled, 097 SyncReplicationState syncReplicationState) throws ReplicationException { 098 List<ZKUtilOp> multiOps = Arrays.asList( 099 ZKUtilOp.createAndFailSilent(getPeerNode(peerId), 100 ReplicationPeerConfigUtil.toByteArray(peerConfig)), 101 ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), 102 enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES), 103 ZKUtilOp.createAndFailSilent(getSyncReplicationStateNode(peerId), 104 SyncReplicationState.toByteArray(syncReplicationState)), 105 ZKUtilOp.createAndFailSilent(getNewSyncReplicationStateNode(peerId), NONE_STATE_ZNODE_BYTES)); 106 try { 107 ZKUtil.createWithParents(zookeeper, peersZNode); 108 ZKUtil.multiOrSequential(zookeeper, multiOps, false); 109 } catch (KeeperException e) { 110 throw new ReplicationException( 111 "Could not add peer with id=" + peerId + ", peerConfig=>" + peerConfig + ", state=" 112 + (enabled ? "ENABLED" : "DISABLED") + ", syncReplicationState=" + syncReplicationState, 113 e); 114 } 115 } 116 117 @Override 118 public void removePeer(String peerId) throws ReplicationException { 119 try { 120 ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId)); 121 } catch (KeeperException e) { 122 throw new ReplicationException("Could not remove peer with id=" + peerId, e); 123 } 124 } 125 126 @Override 127 public void setPeerState(String peerId, boolean enabled) throws ReplicationException { 128 byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; 129 try { 130 ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes); 131 } catch (KeeperException e) { 132 throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e); 133 } 134 } 135 136 @Override 137 public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) 138 throws ReplicationException { 139 try { 140 ZKUtil.setData(this.zookeeper, getPeerNode(peerId), 141 ReplicationPeerConfigUtil.toByteArray(peerConfig)); 142 } catch (KeeperException e) { 143 throw new ReplicationException( 144 "There was a problem trying to save changes to the " + "replication peer " + peerId, e); 145 } 146 } 147 148 @Override 149 public List<String> listPeerIds() throws ReplicationException { 150 try { 151 List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, peersZNode); 152 return children != null ? children : Collections.emptyList(); 153 } catch (KeeperException e) { 154 throw new ReplicationException("Cannot get the list of peers", e); 155 } 156 } 157 158 @Override 159 public boolean isPeerEnabled(String peerId) throws ReplicationException { 160 try { 161 return Arrays.equals(ENABLED_ZNODE_BYTES, 162 ZKUtil.getData(zookeeper, getPeerStateNode(peerId))); 163 } catch (KeeperException | InterruptedException e) { 164 throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); 165 } 166 } 167 168 @Override 169 public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException { 170 byte[] data; 171 try { 172 data = ZKUtil.getData(zookeeper, getPeerNode(peerId)); 173 } catch (KeeperException | InterruptedException e) { 174 throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); 175 } 176 if (data == null || data.length == 0) { 177 throw new ReplicationException( 178 "Replication peer config data shouldn't be empty, peerId=" + peerId); 179 } 180 try { 181 return ReplicationPeerConfigUtil.parsePeerFrom(data); 182 } catch (DeserializationException e) { 183 throw new ReplicationException( 184 "Failed to parse replication peer config for peer with id=" + peerId, e); 185 } 186 } 187 188 @Override 189 public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state) 190 throws ReplicationException { 191 try { 192 ZKUtil.createSetData(zookeeper, getNewSyncReplicationStateNode(peerId), 193 SyncReplicationState.toByteArray(state)); 194 } catch (KeeperException e) { 195 throw new ReplicationException( 196 "Unable to set the new sync replication state for peer with id=" + peerId, e); 197 } 198 } 199 200 @Override 201 public void transitPeerSyncReplicationState(String peerId) throws ReplicationException { 202 String newStateNode = getNewSyncReplicationStateNode(peerId); 203 try { 204 byte[] data = ZKUtil.getData(zookeeper, newStateNode); 205 ZKUtil.multiOrSequential(zookeeper, 206 Arrays.asList(ZKUtilOp.setData(newStateNode, NONE_STATE_ZNODE_BYTES), 207 ZKUtilOp.setData(getSyncReplicationStateNode(peerId), data)), 208 false); 209 } catch (KeeperException | InterruptedException e) { 210 throw new ReplicationException( 211 "Error transiting sync replication state for peer with id=" + peerId, e); 212 } 213 } 214 215 private SyncReplicationState getSyncReplicationState(String peerId, String path) 216 throws ReplicationException { 217 try { 218 byte[] data = ZKUtil.getData(zookeeper, path); 219 if (data == null || data.length == 0) { 220 if (ZKUtil.checkExists(zookeeper, getPeerNode(peerId)) != -1) { 221 // should be a peer from previous version, set the sync replication state for it. 222 ZKUtil.createSetData(zookeeper, path, NONE_STATE_ZNODE_BYTES); 223 return SyncReplicationState.NONE; 224 } else { 225 throw new ReplicationException( 226 "Replication peer sync state shouldn't be empty, peerId=" + peerId); 227 } 228 } 229 return SyncReplicationState.parseFrom(data); 230 } catch (KeeperException | InterruptedException | IOException e) { 231 throw new ReplicationException( 232 "Error getting sync replication state of path " + path + " for peer with id=" + peerId, e); 233 } 234 } 235 236 @Override 237 public SyncReplicationState getPeerNewSyncReplicationState(String peerId) 238 throws ReplicationException { 239 return getSyncReplicationState(peerId, getNewSyncReplicationStateNode(peerId)); 240 } 241 242 @Override 243 public SyncReplicationState getPeerSyncReplicationState(String peerId) 244 throws ReplicationException { 245 return getSyncReplicationState(peerId, getSyncReplicationStateNode(peerId)); 246 } 247}