001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import java.io.Closeable;
022import java.io.IOException;
023import java.util.HashMap;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
032import org.apache.hadoop.hbase.exceptions.DeserializationException;
033import org.apache.hadoop.hbase.log.HBaseMarkers;
034import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
036import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker;
037import org.apache.hadoop.hbase.zookeeper.ZKUtil;
038import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.zookeeper.KeeperException;
041import org.apache.zookeeper.KeeperException.NodeExistsException;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045@InterfaceAudience.Private
046public class ReplicationPeerZKImpl extends ReplicationStateZKBase
047    implements ReplicationPeer, Abortable, Closeable {
048  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class);
049
050  private ReplicationPeerConfig peerConfig;
051  private final String id;
052  private volatile PeerState peerState;
053  private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
054  private final Configuration conf;
055  private PeerStateTracker peerStateTracker;
056  private PeerConfigTracker peerConfigTracker;
057
058
059  /**
060   * Constructor that takes all the objects required to communicate with the specified peer, except
061   * for the region server addresses.
062   * @param conf configuration object to this peer
063   * @param id string representation of this peer's identifier
064   * @param peerConfig configuration for the replication peer
065   */
066  public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf,
067                               String id, ReplicationPeerConfig peerConfig,
068                               Abortable abortable)
069      throws ReplicationException {
070    super(zkWatcher, conf, abortable);
071    this.conf = conf;
072    this.peerConfig = peerConfig;
073    this.id = id;
074  }
075
076  /**
077   * start a state tracker to check whether this peer is enabled or not
078   *
079   * @param peerStateNode path to zk node which stores peer state
080   * @throws KeeperException
081   */
082  public void startStateTracker(String peerStateNode)
083      throws KeeperException {
084    ensurePeerEnabled(peerStateNode);
085    this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
086    this.peerStateTracker.start();
087    try {
088      this.readPeerStateZnode();
089    } catch (DeserializationException e) {
090      throw ZKUtil.convert(e);
091    }
092  }
093
094  private void readPeerStateZnode() throws DeserializationException {
095    this.peerState =
096        isStateEnabled(this.peerStateTracker.getData(false))
097          ? PeerState.ENABLED
098          : PeerState.DISABLED;
099  }
100
101  /**
102   * start a table-cfs tracker to listen the (table, cf-list) map change
103   * @param peerConfigNode path to zk node which stores table-cfs
104   * @throws KeeperException
105   */
106  public void startPeerConfigTracker(String peerConfigNode)
107    throws KeeperException {
108    this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
109        this);
110    this.peerConfigTracker.start();
111    this.readPeerConfig();
112  }
113
114  private ReplicationPeerConfig readPeerConfig() {
115    try {
116      byte[] data = peerConfigTracker.getData(false);
117      if (data != null) {
118        this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
119      }
120    } catch (DeserializationException e) {
121      LOG.error("", e);
122    }
123    return this.peerConfig;
124  }
125
126  @Override
127  public PeerState getPeerState() {
128    return peerState;
129  }
130
131  /**
132   * Get the identifier of this peer
133   * @return string representation of the id (short)
134   */
135  @Override
136  public String getId() {
137    return id;
138  }
139
140  /**
141   * Get the peer config object
142   * @return the ReplicationPeerConfig for this peer
143   */
144  @Override
145  public ReplicationPeerConfig getPeerConfig() {
146    return peerConfig;
147  }
148
149  /**
150   * Get the configuration object required to communicate with this peer
151   * @return configuration object
152   */
153  @Override
154  public Configuration getConfiguration() {
155    return conf;
156  }
157
158  /**
159   * Get replicable (table, cf-list) map of this peer
160   * @return the replicable (table, cf-list) map
161   */
162  @Override
163  public Map<TableName, List<String>> getTableCFs() {
164    this.tableCFs = peerConfig.getTableCFsMap();
165    return this.tableCFs;
166  }
167
168  /**
169   * Get replicable namespace set of this peer
170   * @return the replicable namespaces set
171   */
172  @Override
173  public Set<String> getNamespaces() {
174    return this.peerConfig.getNamespaces();
175  }
176
177  @Override
178  public long getPeerBandwidth() {
179    return this.peerConfig.getBandwidth();
180  }
181
182  @Override
183  public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
184    if (this.peerConfigTracker != null){
185      this.peerConfigTracker.setListener(listener);
186    }
187  }
188
189  @Override
190  public void abort(String why, Throwable e) {
191    LOG.error(HBaseMarkers.FATAL, "The ReplicationPeer corresponding to peer " +
192        peerConfig + " was aborted for the following reason(s):" + why, e);
193  }
194
195  @Override
196  public boolean isAborted() {
197    // Currently the replication peer is never "Aborted", we just log when the
198    // abort method is called.
199    return false;
200  }
201
202  @Override
203  public void close() throws IOException {
204    // TODO: stop zkw?
205  }
206
207  /**
208   * Parse the raw data from ZK to get a peer's state
209   * @param bytes raw ZK data
210   * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
211   * @throws DeserializationException
212   */
213  public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
214    ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
215    return ReplicationProtos.ReplicationState.State.ENABLED == state;
216  }
217
218  /**
219   * @param bytes Content of a state znode.
220   * @return State parsed from the passed bytes.
221   * @throws DeserializationException
222   */
223  private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
224      throws DeserializationException {
225    ProtobufUtil.expectPBMagicPrefix(bytes);
226    int pblen = ProtobufUtil.lengthOfPBMagic();
227    ReplicationProtos.ReplicationState.Builder builder =
228        ReplicationProtos.ReplicationState.newBuilder();
229    ReplicationProtos.ReplicationState state;
230    try {
231      ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
232      state = builder.build();
233      return state.getState();
234    } catch (IOException e) {
235      throw new DeserializationException(e);
236    }
237  }
238
239  /**
240   * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
241   * @param path Path to znode to check
242   * @return True if we created the znode.
243   * @throws NodeExistsException
244   * @throws KeeperException
245   */
246  private boolean ensurePeerEnabled(final String path)
247      throws NodeExistsException, KeeperException {
248    if (ZKUtil.checkExists(zookeeper, path) == -1) {
249      // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
250      // peer-state znode. This happens while adding a peer.
251      // The peer state data is set as "ENABLED" by default.
252      ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
253        ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
254      return true;
255    }
256    return false;
257  }
258
259  /**
260   * Tracker for state of this peer
261   */
262  public class PeerStateTracker extends ZKNodeTracker {
263
264    public PeerStateTracker(String peerStateZNode, ZKWatcher watcher,
265        Abortable abortable) {
266      super(watcher, peerStateZNode, abortable);
267    }
268
269    @Override
270    public synchronized void nodeDataChanged(String path) {
271      if (path.equals(node)) {
272        super.nodeDataChanged(path);
273        try {
274          readPeerStateZnode();
275        } catch (DeserializationException e) {
276          LOG.warn("Failed deserializing the content of " + path, e);
277        }
278      }
279    }
280  }
281
282  /**
283   * Tracker for PeerConfigNode of this peer
284   */
285  public class PeerConfigTracker extends ZKNodeTracker {
286
287    ReplicationPeerConfigListener listener;
288
289    public PeerConfigTracker(String peerConfigNode, ZKWatcher watcher,
290        Abortable abortable) {
291      super(watcher, peerConfigNode, abortable);
292    }
293
294    public synchronized void setListener(ReplicationPeerConfigListener listener){
295      this.listener = listener;
296    }
297
298    @Override
299    public synchronized void nodeCreated(String path) {
300      if (path.equals(node)) {
301        super.nodeCreated(path);
302        ReplicationPeerConfig config = readPeerConfig();
303        if (listener != null){
304          listener.peerConfigUpdated(config);
305        }
306      }
307    }
308
309    @Override
310    public synchronized void nodeDataChanged(String path) {
311      //superclass calls nodeCreated
312      if (path.equals(node)) {
313        super.nodeDataChanged(path);
314      }
315
316    }
317
318  }
319}