001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.util.ArrayList; 021import java.util.List; 022import java.util.concurrent.CopyOnWriteArrayList; 023import org.apache.hadoop.hbase.Abortable; 024import org.apache.hadoop.hbase.Stoppable; 025import org.apache.hadoop.hbase.zookeeper.ZKListener; 026import org.apache.hadoop.hbase.zookeeper.ZKUtil; 027import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.zookeeper.KeeperException; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * This class is a ZooKeeper implementation of the ReplicationTracker interface. This class is 035 * responsible for handling replication events that are defined in the ReplicationListener 036 * interface. 037 */ 038@InterfaceAudience.Private 039public class ReplicationTrackerZKImpl implements ReplicationTracker { 040 041 private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class); 042 043 // Zookeeper 044 private final ZKWatcher zookeeper; 045 // Server to abort. 046 private final Abortable abortable; 047 // All about stopping 048 private final Stoppable stopper; 049 // listeners to be notified 050 private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>(); 051 // List of all the other region servers in this cluster 052 private final ArrayList<String> otherRegionServers = new ArrayList<>(); 053 054 public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) { 055 this.zookeeper = zookeeper; 056 this.abortable = abortable; 057 this.stopper = stopper; 058 this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); 059 // watch the changes 060 refreshOtherRegionServersList(true); 061 } 062 063 @Override 064 public void registerListener(ReplicationListener listener) { 065 listeners.add(listener); 066 } 067 068 @Override 069 public void removeListener(ReplicationListener listener) { 070 listeners.remove(listener); 071 } 072 073 /** 074 * Return a snapshot of the current region servers. 075 */ 076 @Override 077 public List<String> getListOfRegionServers() { 078 refreshOtherRegionServersList(false); 079 080 List<String> list = null; 081 synchronized (otherRegionServers) { 082 list = new ArrayList<>(otherRegionServers); 083 } 084 return list; 085 } 086 087 /** 088 * Watcher used to be notified of the other region server's death in the local cluster. It 089 * initiates the process to transfer the queues if it is able to grab the lock. 090 */ 091 public class OtherRegionServerWatcher extends ZKListener { 092 093 /** 094 * Construct a ZooKeeper event listener. 095 */ 096 public OtherRegionServerWatcher(ZKWatcher watcher) { 097 super(watcher); 098 } 099 100 /** 101 * Called when a new node has been created. 102 * @param path full path of the new node 103 */ 104 @Override 105 public void nodeCreated(String path) { 106 refreshListIfRightPath(path); 107 } 108 109 /** 110 * Called when a node has been deleted 111 * @param path full path of the deleted node 112 */ 113 @Override 114 public void nodeDeleted(String path) { 115 if (stopper.isStopped()) { 116 return; 117 } 118 boolean cont = refreshListIfRightPath(path); 119 if (!cont) { 120 return; 121 } 122 LOG.info(path + " znode expired, triggering replicatorRemoved event"); 123 for (ReplicationListener rl : listeners) { 124 rl.regionServerRemoved(getZNodeName(path)); 125 } 126 } 127 128 /** 129 * Called when an existing node has a child node added or removed. 130 * @param path full path of the node whose children have changed 131 */ 132 @Override 133 public void nodeChildrenChanged(String path) { 134 if (stopper.isStopped()) { 135 return; 136 } 137 refreshListIfRightPath(path); 138 } 139 140 private boolean refreshListIfRightPath(String path) { 141 if (!path.startsWith(this.watcher.getZNodePaths().rsZNode)) { 142 return false; 143 } 144 return refreshOtherRegionServersList(true); 145 } 146 } 147 148 /** 149 * Extracts the znode name of a peer cluster from a ZK path 150 * @param fullPath Path to extract the id from 151 * @return the id or an empty string if path is invalid 152 */ 153 private String getZNodeName(String fullPath) { 154 String[] parts = fullPath.split("/"); 155 return parts.length > 0 ? parts[parts.length - 1] : ""; 156 } 157 158 /** 159 * Reads the list of region servers from ZK and atomically clears our local view of it and 160 * replaces it with the updated list. 161 * @return true if the local list of the other region servers was updated with the ZK data (even 162 * if it was empty), false if the data was missing in ZK 163 */ 164 private boolean refreshOtherRegionServersList(boolean watch) { 165 List<String> newRsList = getRegisteredRegionServers(watch); 166 if (newRsList == null) { 167 return false; 168 } else { 169 synchronized (otherRegionServers) { 170 otherRegionServers.clear(); 171 otherRegionServers.addAll(newRsList); 172 } 173 } 174 return true; 175 } 176 177 /** 178 * Get a list of all the other region servers in this cluster and set a watch 179 * @return a list of server nanes 180 */ 181 private List<String> getRegisteredRegionServers(boolean watch) { 182 List<String> result = null; 183 try { 184 if (watch) { 185 result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, 186 this.zookeeper.getZNodePaths().rsZNode); 187 } else { 188 result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.getZNodePaths().rsZNode); 189 } 190 } catch (KeeperException e) { 191 this.abortable.abort("Get list of registered region servers", e); 192 } 193 return result; 194 } 195}