View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.concurrent.CopyOnWriteArrayList;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.Abortable;
30  import org.apache.hadoop.hbase.Stoppable;
31  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
32  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
33  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
34  import org.apache.zookeeper.KeeperException;
35  
36  /**
37   * This class is a Zookeeper implementation of the ReplicationTracker interface. This class is
38   * responsible for handling replication events that are defined in the ReplicationListener
39   * interface.
40   */
41  @InterfaceAudience.Private
42  public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
43  
44    private static final Log LOG = LogFactory.getLog(ReplicationTrackerZKImpl.class);
45    // All about stopping
46    private final Stoppable stopper;
47    // listeners to be notified
48    private final List<ReplicationListener> listeners =
49        new CopyOnWriteArrayList<ReplicationListener>();
50    // List of all the other region servers in this cluster
51    private final ArrayList<String> otherRegionServers = new ArrayList<String>();
52    private final ReplicationPeers replicationPeers;
53  
54    public ReplicationTrackerZKImpl(ZooKeeperWatcher zookeeper,
55        final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
56        Stoppable stopper) {
57      super(zookeeper, conf, abortable);
58      this.replicationPeers = replicationPeers;
59      this.stopper = stopper;
60      this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
61      this.zookeeper.registerListener(new PeersWatcher(this.zookeeper));
62    }
63  
64    @Override
65    public void registerListener(ReplicationListener listener) {
66      listeners.add(listener);
67    }
68  
69    @Override
70    public void removeListener(ReplicationListener listener) {
71      listeners.remove(listener);
72    }
73  
74    /**
75     * Return a snapshot of the current region servers.
76     */
77    @Override
78    public List<String> getListOfRegionServers() {
79      refreshOtherRegionServersList();
80  
81      List<String> list = null;
82      synchronized (otherRegionServers) {
83        list = new ArrayList<String>(otherRegionServers);
84      }
85      return list;
86    }
87  
88    /**
89     * Watcher used to be notified of the other region server's death in the local cluster. It
90     * initiates the process to transfer the queues if it is able to grab the lock.
91     */
92    public class OtherRegionServerWatcher extends ZooKeeperListener {
93  
94      /**
95       * Construct a ZooKeeper event listener.
96       */
97      public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
98        super(watcher);
99      }
100 
101     /**
102      * Called when a new node has been created.
103      * @param path full path of the new node
104      */
105     public void nodeCreated(String path) {
106       refreshListIfRightPath(path);
107     }
108 
109     /**
110      * Called when a node has been deleted
111      * @param path full path of the deleted node
112      */
113     public void nodeDeleted(String path) {
114       if (stopper.isStopped()) {
115         return;
116       }
117       boolean cont = refreshListIfRightPath(path);
118       if (!cont) {
119         return;
120       }
121       LOG.info(path + " znode expired, triggering replicatorRemoved event");
122       for (ReplicationListener rl : listeners) {
123         rl.regionServerRemoved(getZNodeName(path));
124       }
125     }
126 
127     /**
128      * Called when an existing node has a child node added or removed.
129      * @param path full path of the node whose children have changed
130      */
131     public void nodeChildrenChanged(String path) {
132       if (stopper.isStopped()) {
133         return;
134       }
135       refreshListIfRightPath(path);
136     }
137 
138     private boolean refreshListIfRightPath(String path) {
139       if (!path.startsWith(this.watcher.rsZNode)) {
140         return false;
141       }
142       return refreshOtherRegionServersList();
143     }
144   }
145 
146   /**
147    * Watcher used to follow the creation and deletion of peer clusters.
148    */
149   public class PeersWatcher extends ZooKeeperListener {
150 
151     /**
152      * Construct a ZooKeeper event listener.
153      */
154     public PeersWatcher(ZooKeeperWatcher watcher) {
155       super(watcher);
156     }
157 
158     /**
159      * Called when a node has been deleted
160      * @param path full path of the deleted node
161      */
162     public void nodeDeleted(String path) {
163       List<String> peers = refreshPeersList(path);
164       if (peers == null) {
165         return;
166       }
167       if (isPeerPath(path)) {
168         String id = getZNodeName(path);
169         LOG.info(path + " znode expired, triggering peerRemoved event");
170         for (ReplicationListener rl : listeners) {
171           rl.peerRemoved(id);
172         }
173       }
174     }
175 
176     /**
177      * Called when an existing node has a child node added or removed.
178      * @param path full path of the node whose children have changed
179      */
180     public void nodeChildrenChanged(String path) {
181       List<String> peers = refreshPeersList(path);
182       if (peers == null) {
183         return;
184       }
185       LOG.info(path + " znode expired, triggering peerListChanged event");
186       for (ReplicationListener rl : listeners) {
187         rl.peerListChanged(peers);
188       }
189     }
190   }
191 
192   /**
193    * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also
194    * reset the watches.
195    * @param path path to check against
196    * @return A list of peers' identifiers if the event concerns this watcher, else null.
197    */
198   private List<String> refreshPeersList(String path) {
199     if (!path.startsWith(getPeersZNode())) {
200       return null;
201     }
202     return this.replicationPeers.getAllPeerIds();
203   }
204 
205   private String getPeersZNode() {
206     return this.peersZNode;
207   }
208 
209   /**
210    * Extracts the znode name of a peer cluster from a ZK path
211    * @param fullPath Path to extract the id from
212    * @return the id or an empty string if path is invalid
213    */
214   private String getZNodeName(String fullPath) {
215     String[] parts = fullPath.split("/");
216     return parts.length > 0 ? parts[parts.length - 1] : "";
217   }
218 
219   /**
220    * Reads the list of region servers from ZK and atomically clears our local view of it and
221    * replaces it with the updated list.
222    * @return true if the local list of the other region servers was updated with the ZK data (even
223    *         if it was empty), false if the data was missing in ZK
224    */
225   private boolean refreshOtherRegionServersList() {
226     List<String> newRsList = getRegisteredRegionServers();
227     if (newRsList == null) {
228       return false;
229     } else {
230       synchronized (otherRegionServers) {
231         otherRegionServers.clear();
232         otherRegionServers.addAll(newRsList);
233       }
234     }
235     return true;
236   }
237 
238   /**
239    * Get a list of all the other region servers in this cluster and set a watch
240    * @return a list of server nanes
241    */
242   private List<String> getRegisteredRegionServers() {
243     List<String> result = null;
244     try {
245       result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.rsZNode);
246     } catch (KeeperException e) {
247       this.abortable.abort("Get list of registered region servers", e);
248     }
249     return result;
250   }
251 }