001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.UUID; 026 027import org.apache.hadoop.hbase.zookeeper.ZKListener; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 032import org.apache.hadoop.hbase.zookeeper.ZKUtil; 033import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 034import org.apache.zookeeper.KeeperException; 035import org.apache.zookeeper.KeeperException.AuthFailedException; 036import org.apache.zookeeper.KeeperException.ConnectionLossException; 037import org.apache.zookeeper.KeeperException.SessionExpiredException; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * A {@link BaseReplicationEndpoint} for replication endpoints whose 043 * target cluster is an HBase cluster. 044 */ 045@InterfaceAudience.Private 046public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint 047 implements Abortable { 048 049 private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); 050 051 private ZKWatcher zkw = null; 052 053 private List<ServerName> regionServers = new ArrayList<>(0); 054 private long lastRegionServerUpdate; 055 056 protected synchronized void disconnect() { 057 if (zkw != null) { 058 zkw.close(); 059 } 060 } 061 062 /** 063 * A private method used to re-establish a zookeeper session with a peer cluster. 064 * @param ke 065 */ 066 protected void reconnect(KeeperException ke) { 067 if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException 068 || ke instanceof AuthFailedException) { 069 String clusterKey = ctx.getPeerConfig().getClusterKey(); 070 LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke); 071 try { 072 reloadZkWatcher(); 073 } catch (IOException io) { 074 LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io); 075 } 076 } 077 } 078 079 @Override 080 public void start() { 081 startAsync(); 082 } 083 084 @Override 085 public void stop() { 086 stopAsync(); 087 } 088 089 @Override 090 protected void doStart() { 091 try { 092 reloadZkWatcher(); 093 notifyStarted(); 094 } catch (IOException e) { 095 notifyFailed(e); 096 } 097 } 098 099 @Override 100 protected void doStop() { 101 disconnect(); 102 notifyStopped(); 103 } 104 105 @Override 106 // Synchronize peer cluster connection attempts to avoid races and rate 107 // limit connections when multiple replication sources try to connect to 108 // the peer cluster. If the peer cluster is down we can get out of control 109 // over time. 110 public synchronized UUID getPeerUUID() { 111 UUID peerUUID = null; 112 try { 113 peerUUID = ZKClusterId.getUUIDForCluster(zkw); 114 } catch (KeeperException ke) { 115 reconnect(ke); 116 } 117 return peerUUID; 118 } 119 120 /** 121 * Get the ZK connection to this peer 122 * @return zk connection 123 */ 124 protected synchronized ZKWatcher getZkw() { 125 return zkw; 126 } 127 128 /** 129 * Closes the current ZKW (if not null) and creates a new one 130 * @throws IOException If anything goes wrong connecting 131 */ 132 synchronized void reloadZkWatcher() throws IOException { 133 if (zkw != null) zkw.close(); 134 zkw = new ZKWatcher(ctx.getConfiguration(), 135 "connection to cluster: " + ctx.getPeerId(), this); 136 getZkw().registerListener(new PeerRegionServerListener(this)); 137 } 138 139 @Override 140 public void abort(String why, Throwable e) { 141 LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId() 142 + " was aborted for the following reason(s):" + why, e); 143 } 144 145 @Override 146 public boolean isAborted() { 147 // Currently this is never "Aborted", we just log when the abort method is called. 148 return false; 149 } 150 151 /** 152 * Get the list of all the region servers from the specified peer 153 * @param zkw zk connection to use 154 * @return list of region server addresses or an empty list if the slave is unavailable 155 */ 156 protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw) 157 throws KeeperException { 158 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, 159 zkw.getZNodePaths().rsZNode); 160 if (children == null) { 161 return Collections.emptyList(); 162 } 163 List<ServerName> addresses = new ArrayList<>(children.size()); 164 for (String child : children) { 165 addresses.add(ServerName.parseServerName(child)); 166 } 167 return addresses; 168 } 169 170 /** 171 * Get a list of all the addresses of all the region servers 172 * for this peer cluster 173 * @return list of addresses 174 */ 175 // Synchronize peer cluster connection attempts to avoid races and rate 176 // limit connections when multiple replication sources try to connect to 177 // the peer cluster. If the peer cluster is down we can get out of control 178 // over time. 179 public synchronized List<ServerName> getRegionServers() { 180 try { 181 setRegionServers(fetchSlavesAddresses(this.getZkw())); 182 } catch (KeeperException ke) { 183 if (LOG.isDebugEnabled()) { 184 LOG.debug("Fetch slaves addresses failed", ke); 185 } 186 reconnect(ke); 187 } 188 return regionServers; 189 } 190 191 /** 192 * Set the list of region servers for that peer 193 * @param regionServers list of addresses for the region servers 194 */ 195 public synchronized void setRegionServers(List<ServerName> regionServers) { 196 this.regionServers = regionServers; 197 lastRegionServerUpdate = System.currentTimeMillis(); 198 } 199 200 /** 201 * Get the timestamp at which the last change occurred to the list of region servers to replicate 202 * to. 203 * @return The System.currentTimeMillis at the last time the list of peer region servers changed. 204 */ 205 public long getLastRegionServerUpdate() { 206 return lastRegionServerUpdate; 207 } 208 209 /** 210 * Tracks changes to the list of region servers in a peer's cluster. 211 */ 212 public static class PeerRegionServerListener extends ZKListener { 213 214 private final HBaseReplicationEndpoint replicationEndpoint; 215 private final String regionServerListNode; 216 217 public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) { 218 super(replicationPeer.getZkw()); 219 this.replicationEndpoint = replicationPeer; 220 this.regionServerListNode = replicationEndpoint.getZkw().getZNodePaths().rsZNode; 221 } 222 223 @Override 224 public synchronized void nodeChildrenChanged(String path) { 225 if (path.equals(regionServerListNode)) { 226 try { 227 LOG.info("Detected change to peer region servers, fetching updated list"); 228 replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw())); 229 } catch (KeeperException e) { 230 LOG.error("Error reading slave addresses", e); 231 } 232 } 233 } 234 } 235}