001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.ArrayList;
021import java.util.List;
022import java.util.concurrent.CopyOnWriteArrayList;
023import org.apache.hadoop.hbase.Abortable;
024import org.apache.hadoop.hbase.Stoppable;
025import org.apache.hadoop.hbase.zookeeper.ZKListener;
026import org.apache.hadoop.hbase.zookeeper.ZKUtil;
027import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.zookeeper.KeeperException;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is
035 * responsible for handling replication events that are defined in the ReplicationListener
036 * interface.
037 */
038@InterfaceAudience.Private
039public class ReplicationTrackerZKImpl implements ReplicationTracker {
040
041  private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class);
042
043  // Zookeeper
044  private final ZKWatcher zookeeper;
045  // Server to abort.
046  private final Abortable abortable;
047  // All about stopping
048  private final Stoppable stopper;
049  // listeners to be notified
050  private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>();
051  // List of all the other region servers in this cluster
052  private final ArrayList<String> otherRegionServers = new ArrayList<>();
053
054  public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) {
055    this.zookeeper = zookeeper;
056    this.abortable = abortable;
057    this.stopper = stopper;
058    this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
059    // watch the changes
060    refreshOtherRegionServersList(true);
061  }
062
063  @Override
064  public void registerListener(ReplicationListener listener) {
065    listeners.add(listener);
066  }
067
068  @Override
069  public void removeListener(ReplicationListener listener) {
070    listeners.remove(listener);
071  }
072
073  /**
074   * Return a snapshot of the current region servers.
075   */
076  @Override
077  public List<String> getListOfRegionServers() {
078    refreshOtherRegionServersList(false);
079
080    List<String> list = null;
081    synchronized (otherRegionServers) {
082      list = new ArrayList<>(otherRegionServers);
083    }
084    return list;
085  }
086
087  /**
088   * Watcher used to be notified of the other region server's death in the local cluster. It
089   * initiates the process to transfer the queues if it is able to grab the lock.
090   */
091  public class OtherRegionServerWatcher extends ZKListener {
092
093    /**
094     * Construct a ZooKeeper event listener.
095     */
096    public OtherRegionServerWatcher(ZKWatcher watcher) {
097      super(watcher);
098    }
099
100    /**
101     * Called when a new node has been created.
102     * @param path full path of the new node
103     */
104    @Override
105    public void nodeCreated(String path) {
106      refreshListIfRightPath(path);
107    }
108
109    /**
110     * Called when a node has been deleted
111     * @param path full path of the deleted node
112     */
113    @Override
114    public void nodeDeleted(String path) {
115      if (stopper.isStopped()) {
116        return;
117      }
118      boolean cont = refreshListIfRightPath(path);
119      if (!cont) {
120        return;
121      }
122      LOG.info(path + " znode expired, triggering replicatorRemoved event");
123      for (ReplicationListener rl : listeners) {
124        rl.regionServerRemoved(getZNodeName(path));
125      }
126    }
127
128    /**
129     * Called when an existing node has a child node added or removed.
130     * @param path full path of the node whose children have changed
131     */
132    @Override
133    public void nodeChildrenChanged(String path) {
134      if (stopper.isStopped()) {
135        return;
136      }
137      refreshListIfRightPath(path);
138    }
139
140    private boolean refreshListIfRightPath(String path) {
141      if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) {
142        return false;
143      }
144      return refreshOtherRegionServersList(true);
145    }
146  }
147
148  /**
149   * Extracts the znode name of a peer cluster from a ZK path
150   * @param fullPath Path to extract the id from
151   * @return the id or an empty string if path is invalid
152   */
153  private String getZNodeName(String fullPath) {
154    String[] parts = fullPath.split("/");
155    return parts.length > 0 ? parts[parts.length - 1] : "";
156  }
157
158  /**
159   * Reads the list of region servers from ZK and atomically clears our local view of it and
160   * replaces it with the updated list.
161   * @return true if the local list of the other region servers was updated with the ZK data (even
162   *         if it was empty), false if the data was missing in ZK
163   */
164  private boolean refreshOtherRegionServersList(boolean watch) {
165    List<String> newRsList = getRegisteredRegionServers(watch);
166    if (newRsList == null) {
167      return false;
168    } else {
169      synchronized (otherRegionServers) {
170        otherRegionServers.clear();
171        otherRegionServers.addAll(newRsList);
172      }
173    }
174    return true;
175  }
176
177  /**
178   * Get a list of all the other region servers in this cluster and set a watch
179   * @return a list of server nanes
180   */
181  private List<String> getRegisteredRegionServers(boolean watch) {
182    List<String> result = null;
183    try {
184      if (watch) {
185        result = ZKUtil.listChildrenAndWatchThem(this.zookeeper,
186                this.zookeeper.getZNodePaths().rsZNode);
187      } else {
188        result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.getZNodePaths().rsZNode);
189      }
190    } catch (KeeperException e) {
191      this.abortable.abort("Get list of registered region servers", e);
192    }
193    return result;
194  }
195}