001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import java.util.ArrayList;
022import java.util.List;
023import java.util.concurrent.CopyOnWriteArrayList;
024
025import org.apache.hadoop.hbase.zookeeper.ZKListener;
026import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.Stoppable;
031import org.apache.hadoop.hbase.zookeeper.ZKUtil;
032import org.apache.zookeeper.KeeperException;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is
038 * responsible for handling replication events that are defined in the ReplicationListener
039 * interface.
040 */
041@InterfaceAudience.Private
042public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
043
044  private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class);
045  // All about stopping
046  private final Stoppable stopper;
047  // listeners to be notified
048  private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
049  // List of all the other region servers in this cluster
050  private final ArrayList<String> otherRegionServers = new ArrayList<>();
051  private final ReplicationPeers replicationPeers;
052
053  public ReplicationTrackerZKImpl(ZKWatcher zookeeper,
054      final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
055      Stoppable stopper) {
056    super(zookeeper, conf, abortable);
057    this.replicationPeers = replicationPeers;
058    this.stopper = stopper;
059    this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
060    this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
061  }
062
063  @Override
064  public void registerListener(ReplicationListener listener) {
065    listeners.add(listener);
066  }
067
068  @Override
069  public void removeListener(ReplicationListener listener) {
070    listeners.remove(listener);
071  }
072
073  /**
074   * Return a snapshot of the current region servers.
075   */
076  @Override
077  public List<String> getListOfRegionServers() {
078    refreshOtherRegionServersList();
079
080    List<String> list = null;
081    synchronized (otherRegionServers) {
082      list = new ArrayList<>(otherRegionServers);
083    }
084    return list;
085  }
086
087  /**
088   * Watcher used to be notified of the other region server's death in the local cluster. It
089   * initiates the process to transfer the queues if it is able to grab the lock.
090   */
091  public class OtherRegionServerWatcher extends ZKListener {
092
093    /**
094     * Construct a ZooKeeper event listener.
095     */
096    public OtherRegionServerWatcher(ZKWatcher watcher) {
097      super(watcher);
098    }
099
100    /**
101     * Called when a new node has been created.
102     * @param path full path of the new node
103     */
104    @Override
105    public void nodeCreated(String path) {
106      refreshListIfRightPath(path);
107    }
108
109    /**
110     * Called when a node has been deleted
111     * @param path full path of the deleted node
112     */
113    @Override
114    public void nodeDeleted(String path) {
115      if (stopper.isStopped()) {
116        return;
117      }
118      boolean cont = refreshListIfRightPath(path);
119      if (!cont) {
120        return;
121      }
122      LOG.info(path + " znode expired, triggering replicatorRemoved event");
123      for (ReplicationListener rl : listeners) {
124        rl.regionServerRemoved(getZNodeName(path));
125      }
126    }
127
128    /**
129     * Called when an existing node has a child node added or removed.
130     * @param path full path of the node whose children have changed
131     */
132    @Override
133    public void nodeChildrenChanged(String path) {
134      if (stopper.isStopped()) {
135        return;
136      }
137      refreshListIfRightPath(path);
138    }
139
140    private boolean refreshListIfRightPath(String path) {
141      if (!path.startsWith(this.watcher.znodePaths.rsZNode)) {
142        return false;
143      }
144      return refreshOtherRegionServersList();
145    }
146  }
147
148  /**
149   * Watcher used to follow the creation and deletion of peer clusters.
150   */
151  public class PeersWatcher extends ZKListener {
152
153    /**
154     * Construct a ZooKeeper event listener.
155     */
156    public PeersWatcher(ZKWatcher watcher) {
157      super(watcher);
158    }
159
160    /**
161     * Called when a node has been deleted
162     * @param path full path of the deleted node
163     */
164    @Override
165    public void nodeDeleted(String path) {
166      List<String> peers = refreshPeersList(path);
167      if (peers == null) {
168        return;
169      }
170      if (isPeerPath(path)) {
171        String id = getZNodeName(path);
172        LOG.info(path + " znode expired, triggering peerRemoved event");
173        for (ReplicationListener rl : listeners) {
174          rl.peerRemoved(id);
175        }
176      }
177    }
178
179    /**
180     * Called when an existing node has a child node added or removed.
181     * @param path full path of the node whose children have changed
182     */
183    @Override
184    public void nodeChildrenChanged(String path) {
185      List<String> peers = refreshPeersList(path);
186      if (peers == null) {
187        return;
188      }
189      LOG.info(path + " znode expired, triggering peerListChanged event");
190      for (ReplicationListener rl : listeners) {
191        rl.peerListChanged(peers);
192      }
193    }
194  }
195
196  /**
197   * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also
198   * reset the watches.
199   * @param path path to check against
200   * @return A list of peers' identifiers if the event concerns this watcher, else null.
201   */
202  private List<String> refreshPeersList(String path) {
203    if (!path.startsWith(getPeersZNode())) {
204      return null;
205    }
206    return this.replicationPeers.getAllPeerIds();
207  }
208
209  private String getPeersZNode() {
210    return this.peersZNode;
211  }
212
213  /**
214   * Extracts the znode name of a peer cluster from a ZK path
215   * @param fullPath Path to extract the id from
216   * @return the id or an empty string if path is invalid
217   */
218  private String getZNodeName(String fullPath) {
219    String[] parts = fullPath.split("/");
220    return parts.length > 0 ? parts[parts.length - 1] : "";
221  }
222
223  /**
224   * Reads the list of region servers from ZK and atomically clears our local view of it and
225   * replaces it with the updated list.
226   * @return true if the local list of the other region servers was updated with the ZK data (even
227   *         if it was empty), false if the data was missing in ZK
228   */
229  private boolean refreshOtherRegionServersList() {
230    List<String> newRsList = getRegisteredRegionServers();
231    if (newRsList == null) {
232      return false;
233    } else {
234      synchronized (otherRegionServers) {
235        otherRegionServers.clear();
236        otherRegionServers.addAll(newRsList);
237      }
238    }
239    return true;
240  }
241
242  /**
243   * Get a list of all the other region servers in this cluster and set a watch
244   * @return a list of server nanes
245   */
246  private List<String> getRegisteredRegionServers() {
247    List<String> result = null;
248    try {
249      result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
250    } catch (KeeperException e) {
251      this.abortable.abort("Get list of registered region servers", e);
252    }
253    return result;
254  }
255}