001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication; 020 021import java.util.ArrayList; 022import java.util.List; 023import java.util.concurrent.CopyOnWriteArrayList; 024 025import org.apache.hadoop.hbase.zookeeper.ZKListener; 026import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.Stoppable; 031import org.apache.hadoop.hbase.zookeeper.ZKUtil; 032import org.apache.zookeeper.KeeperException; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is 038 * responsible for handling replication events that are defined in the ReplicationListener 039 * interface. 040 */ 041@InterfaceAudience.Private 042public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker { 043 044 private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class); 045 // All about stopping 046 private final Stoppable stopper; 047 // listeners to be notified 048 private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>(); 049 // List of all the other region servers in this cluster 050 private final ArrayList<String> otherRegionServers = new ArrayList<>(); 051 private final ReplicationPeers replicationPeers; 052 053 public ReplicationTrackerZKImpl(ZKWatcher zookeeper, 054 final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, 055 Stoppable stopper) { 056 super(zookeeper, conf, abortable); 057 this.replicationPeers = replicationPeers; 058 this.stopper = stopper; 059 this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); 060 this.zookeeper.registerListener(new PeersWatcher(this.zookeeper)); 061 } 062 063 @Override 064 public void registerListener(ReplicationListener listener) { 065 listeners.add(listener); 066 } 067 068 @Override 069 public void removeListener(ReplicationListener listener) { 070 listeners.remove(listener); 071 } 072 073 /** 074 * Return a snapshot of the current region servers. 075 */ 076 @Override 077 public List<String> getListOfRegionServers() { 078 refreshOtherRegionServersList(); 079 080 List<String> list = null; 081 synchronized (otherRegionServers) { 082 list = new ArrayList<>(otherRegionServers); 083 } 084 return list; 085 } 086 087 /** 088 * Watcher used to be notified of the other region server's death in the local cluster. It 089 * initiates the process to transfer the queues if it is able to grab the lock. 090 */ 091 public class OtherRegionServerWatcher extends ZKListener { 092 093 /** 094 * Construct a ZooKeeper event listener. 095 */ 096 public OtherRegionServerWatcher(ZKWatcher watcher) { 097 super(watcher); 098 } 099 100 /** 101 * Called when a new node has been created. 102 * @param path full path of the new node 103 */ 104 @Override 105 public void nodeCreated(String path) { 106 refreshListIfRightPath(path); 107 } 108 109 /** 110 * Called when a node has been deleted 111 * @param path full path of the deleted node 112 */ 113 @Override 114 public void nodeDeleted(String path) { 115 if (stopper.isStopped()) { 116 return; 117 } 118 boolean cont = refreshListIfRightPath(path); 119 if (!cont) { 120 return; 121 } 122 LOG.info(path + " znode expired, triggering replicatorRemoved event"); 123 for (ReplicationListener rl : listeners) { 124 rl.regionServerRemoved(getZNodeName(path)); 125 } 126 } 127 128 /** 129 * Called when an existing node has a child node added or removed. 130 * @param path full path of the node whose children have changed 131 */ 132 @Override 133 public void nodeChildrenChanged(String path) { 134 if (stopper.isStopped()) { 135 return; 136 } 137 refreshListIfRightPath(path); 138 } 139 140 private boolean refreshListIfRightPath(String path) { 141 if (!path.startsWith(this.watcher.znodePaths.rsZNode)) { 142 return false; 143 } 144 return refreshOtherRegionServersList(); 145 } 146 } 147 148 /** 149 * Watcher used to follow the creation and deletion of peer clusters. 150 */ 151 public class PeersWatcher extends ZKListener { 152 153 /** 154 * Construct a ZooKeeper event listener. 155 */ 156 public PeersWatcher(ZKWatcher watcher) { 157 super(watcher); 158 } 159 160 /** 161 * Called when a node has been deleted 162 * @param path full path of the deleted node 163 */ 164 @Override 165 public void nodeDeleted(String path) { 166 List<String> peers = refreshPeersList(path); 167 if (peers == null) { 168 return; 169 } 170 if (isPeerPath(path)) { 171 String id = getZNodeName(path); 172 LOG.info(path + " znode expired, triggering peerRemoved event"); 173 for (ReplicationListener rl : listeners) { 174 rl.peerRemoved(id); 175 } 176 } 177 } 178 179 /** 180 * Called when an existing node has a child node added or removed. 181 * @param path full path of the node whose children have changed 182 */ 183 @Override 184 public void nodeChildrenChanged(String path) { 185 List<String> peers = refreshPeersList(path); 186 if (peers == null) { 187 return; 188 } 189 LOG.info(path + " znode expired, triggering peerListChanged event"); 190 for (ReplicationListener rl : listeners) { 191 rl.peerListChanged(peers); 192 } 193 } 194 } 195 196 /** 197 * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also 198 * reset the watches. 199 * @param path path to check against 200 * @return A list of peers' identifiers if the event concerns this watcher, else null. 201 */ 202 private List<String> refreshPeersList(String path) { 203 if (!path.startsWith(getPeersZNode())) { 204 return null; 205 } 206 return this.replicationPeers.getAllPeerIds(); 207 } 208 209 private String getPeersZNode() { 210 return this.peersZNode; 211 } 212 213 /** 214 * Extracts the znode name of a peer cluster from a ZK path 215 * @param fullPath Path to extract the id from 216 * @return the id or an empty string if path is invalid 217 */ 218 private String getZNodeName(String fullPath) { 219 String[] parts = fullPath.split("/"); 220 return parts.length > 0 ? parts[parts.length - 1] : ""; 221 } 222 223 /** 224 * Reads the list of region servers from ZK and atomically clears our local view of it and 225 * replaces it with the updated list. 226 * @return true if the local list of the other region servers was updated with the ZK data (even 227 * if it was empty), false if the data was missing in ZK 228 */ 229 private boolean refreshOtherRegionServersList() { 230 List<String> newRsList = getRegisteredRegionServers(); 231 if (newRsList == null) { 232 return false; 233 } else { 234 synchronized (otherRegionServers) { 235 otherRegionServers.clear(); 236 otherRegionServers.addAll(newRsList); 237 } 238 } 239 return true; 240 } 241 242 /** 243 * Get a list of all the other region servers in this cluster and set a watch 244 * @return a list of server nanes 245 */ 246 private List<String> getRegisteredRegionServers() { 247 List<String> result = null; 248 try { 249 result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode); 250 } catch (KeeperException e) { 251 this.abortable.abort("Get list of registered region servers", e); 252 } 253 return result; 254 } 255}