001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.util.HashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 032import org.apache.hadoop.hbase.exceptions.DeserializationException; 033import org.apache.hadoop.hbase.log.HBaseMarkers; 034import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 036import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; 037import org.apache.hadoop.hbase.zookeeper.ZKUtil; 038import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.zookeeper.KeeperException; 041import org.apache.zookeeper.KeeperException.NodeExistsException; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@InterfaceAudience.Private 046public class ReplicationPeerZKImpl extends ReplicationStateZKBase 047 implements ReplicationPeer, Abortable, Closeable { 048 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class); 049 050 private ReplicationPeerConfig peerConfig; 051 private final String id; 052 private volatile PeerState peerState; 053 private volatile Map<TableName, List<String>> tableCFs = new HashMap<>(); 054 private final Configuration conf; 055 private PeerStateTracker peerStateTracker; 056 private PeerConfigTracker peerConfigTracker; 057 058 059 /** 060 * Constructor that takes all the objects required to communicate with the specified peer, except 061 * for the region server addresses. 062 * @param conf configuration object to this peer 063 * @param id string representation of this peer's identifier 064 * @param peerConfig configuration for the replication peer 065 */ 066 public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, 067 String id, ReplicationPeerConfig peerConfig, 068 Abortable abortable) 069 throws ReplicationException { 070 super(zkWatcher, conf, abortable); 071 this.conf = conf; 072 this.peerConfig = peerConfig; 073 this.id = id; 074 } 075 076 /** 077 * start a state tracker to check whether this peer is enabled or not 078 * 079 * @param peerStateNode path to zk node which stores peer state 080 * @throws KeeperException 081 */ 082 public void startStateTracker(String peerStateNode) 083 throws KeeperException { 084 ensurePeerEnabled(peerStateNode); 085 this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); 086 this.peerStateTracker.start(); 087 try { 088 this.readPeerStateZnode(); 089 } catch (DeserializationException e) { 090 throw ZKUtil.convert(e); 091 } 092 } 093 094 private void readPeerStateZnode() throws DeserializationException { 095 this.peerState = 096 isStateEnabled(this.peerStateTracker.getData(false)) 097 ? PeerState.ENABLED 098 : PeerState.DISABLED; 099 } 100 101 /** 102 * start a table-cfs tracker to listen the (table, cf-list) map change 103 * @param peerConfigNode path to zk node which stores table-cfs 104 * @throws KeeperException 105 */ 106 public void startPeerConfigTracker(String peerConfigNode) 107 throws KeeperException { 108 this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper, 109 this); 110 this.peerConfigTracker.start(); 111 this.readPeerConfig(); 112 } 113 114 private ReplicationPeerConfig readPeerConfig() { 115 try { 116 byte[] data = peerConfigTracker.getData(false); 117 if (data != null) { 118 this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data); 119 } 120 } catch (DeserializationException e) { 121 LOG.error("", e); 122 } 123 return this.peerConfig; 124 } 125 126 @Override 127 public PeerState getPeerState() { 128 return peerState; 129 } 130 131 /** 132 * Get the identifier of this peer 133 * @return string representation of the id (short) 134 */ 135 @Override 136 public String getId() { 137 return id; 138 } 139 140 /** 141 * Get the peer config object 142 * @return the ReplicationPeerConfig for this peer 143 */ 144 @Override 145 public ReplicationPeerConfig getPeerConfig() { 146 return peerConfig; 147 } 148 149 /** 150 * Get the configuration object required to communicate with this peer 151 * @return configuration object 152 */ 153 @Override 154 public Configuration getConfiguration() { 155 return conf; 156 } 157 158 /** 159 * Get replicable (table, cf-list) map of this peer 160 * @return the replicable (table, cf-list) map 161 */ 162 @Override 163 public Map<TableName, List<String>> getTableCFs() { 164 this.tableCFs = peerConfig.getTableCFsMap(); 165 return this.tableCFs; 166 } 167 168 /** 169 * Get replicable namespace set of this peer 170 * @return the replicable namespaces set 171 */ 172 @Override 173 public Set<String> getNamespaces() { 174 return this.peerConfig.getNamespaces(); 175 } 176 177 @Override 178 public long getPeerBandwidth() { 179 return this.peerConfig.getBandwidth(); 180 } 181 182 @Override 183 public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { 184 if (this.peerConfigTracker != null){ 185 this.peerConfigTracker.setListener(listener); 186 } 187 } 188 189 @Override 190 public void abort(String why, Throwable e) { 191 LOG.error(HBaseMarkers.FATAL, "The ReplicationPeer corresponding to peer " + 192 peerConfig + " was aborted for the following reason(s):" + why, e); 193 } 194 195 @Override 196 public boolean isAborted() { 197 // Currently the replication peer is never "Aborted", we just log when the 198 // abort method is called. 199 return false; 200 } 201 202 @Override 203 public void close() throws IOException { 204 // TODO: stop zkw? 205 } 206 207 /** 208 * Parse the raw data from ZK to get a peer's state 209 * @param bytes raw ZK data 210 * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state. 211 * @throws DeserializationException 212 */ 213 public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { 214 ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes); 215 return ReplicationProtos.ReplicationState.State.ENABLED == state; 216 } 217 218 /** 219 * @param bytes Content of a state znode. 220 * @return State parsed from the passed bytes. 221 * @throws DeserializationException 222 */ 223 private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes) 224 throws DeserializationException { 225 ProtobufUtil.expectPBMagicPrefix(bytes); 226 int pblen = ProtobufUtil.lengthOfPBMagic(); 227 ReplicationProtos.ReplicationState.Builder builder = 228 ReplicationProtos.ReplicationState.newBuilder(); 229 ReplicationProtos.ReplicationState state; 230 try { 231 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); 232 state = builder.build(); 233 return state.getState(); 234 } catch (IOException e) { 235 throw new DeserializationException(e); 236 } 237 } 238 239 /** 240 * Utility method to ensure an ENABLED znode is in place; if not present, we create it. 241 * @param path Path to znode to check 242 * @return True if we created the znode. 243 * @throws NodeExistsException 244 * @throws KeeperException 245 */ 246 private boolean ensurePeerEnabled(final String path) 247 throws NodeExistsException, KeeperException { 248 if (ZKUtil.checkExists(zookeeper, path) == -1) { 249 // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the 250 // peer-state znode. This happens while adding a peer. 251 // The peer state data is set as "ENABLED" by default. 252 ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, 253 ReplicationStateZKBase.ENABLED_ZNODE_BYTES); 254 return true; 255 } 256 return false; 257 } 258 259 /** 260 * Tracker for state of this peer 261 */ 262 public class PeerStateTracker extends ZKNodeTracker { 263 264 public PeerStateTracker(String peerStateZNode, ZKWatcher watcher, 265 Abortable abortable) { 266 super(watcher, peerStateZNode, abortable); 267 } 268 269 @Override 270 public synchronized void nodeDataChanged(String path) { 271 if (path.equals(node)) { 272 super.nodeDataChanged(path); 273 try { 274 readPeerStateZnode(); 275 } catch (DeserializationException e) { 276 LOG.warn("Failed deserializing the content of " + path, e); 277 } 278 } 279 } 280 } 281 282 /** 283 * Tracker for PeerConfigNode of this peer 284 */ 285 public class PeerConfigTracker extends ZKNodeTracker { 286 287 ReplicationPeerConfigListener listener; 288 289 public PeerConfigTracker(String peerConfigNode, ZKWatcher watcher, 290 Abortable abortable) { 291 super(watcher, peerConfigNode, abortable); 292 } 293 294 public synchronized void setListener(ReplicationPeerConfigListener listener){ 295 this.listener = listener; 296 } 297 298 @Override 299 public synchronized void nodeCreated(String path) { 300 if (path.equals(node)) { 301 super.nodeCreated(path); 302 ReplicationPeerConfig config = readPeerConfig(); 303 if (listener != null){ 304 listener.peerConfigUpdated(config); 305 } 306 } 307 } 308 309 @Override 310 public synchronized void nodeDataChanged(String path) { 311 //superclass calls nodeCreated 312 if (path.equals(node)) { 313 super.nodeDataChanged(path); 314 } 315 316 } 317 318 } 319}