001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.UUID; 026 027import org.apache.hadoop.hbase.zookeeper.ZKListener; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 032import org.apache.hadoop.hbase.zookeeper.ZKUtil; 033import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 034import org.apache.zookeeper.KeeperException; 035import org.apache.zookeeper.KeeperException.AuthFailedException; 036import org.apache.zookeeper.KeeperException.ConnectionLossException; 037import org.apache.zookeeper.KeeperException.SessionExpiredException; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * A {@link BaseReplicationEndpoint} for replication endpoints whose 043 * target cluster is an HBase cluster. 044 */ 045@InterfaceAudience.Private 046public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint 047 implements Abortable { 048 049 private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); 050 051 private ZKWatcher zkw = null; 052 053 private List<ServerName> regionServers = new ArrayList<>(0); 054 private long lastRegionServerUpdate; 055 056 protected synchronized void disconnect() { 057 if (zkw != null) { 058 zkw.close(); 059 } 060 } 061 062 /** 063 * A private method used to re-establish a zookeeper session with a peer cluster. 064 * @param ke 065 */ 066 protected void reconnect(KeeperException ke) { 067 if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException 068 || ke instanceof AuthFailedException) { 069 String clusterKey = ctx.getPeerConfig().getClusterKey(); 070 LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke); 071 try { 072 reloadZkWatcher(); 073 } catch (IOException io) { 074 LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io); 075 } 076 } 077 } 078 079 @Override 080 public void start() { 081 startAsync(); 082 } 083 084 @Override 085 public void stop() { 086 stopAsync(); 087 } 088 089 @Override 090 protected void doStart() { 091 try { 092 reloadZkWatcher(); 093 notifyStarted(); 094 } catch (IOException e) { 095 notifyFailed(e); 096 } 097 } 098 099 @Override 100 protected void doStop() { 101 disconnect(); 102 notifyStopped(); 103 } 104 105 @Override 106 // Synchronize peer cluster connection attempts to avoid races and rate 107 // limit connections when multiple replication sources try to connect to 108 // the peer cluster. If the peer cluster is down we can get out of control 109 // over time. 110 public synchronized UUID getPeerUUID() { 111 UUID peerUUID = null; 112 try { 113 peerUUID = ZKClusterId.getUUIDForCluster(zkw); 114 } catch (KeeperException ke) { 115 reconnect(ke); 116 } 117 return peerUUID; 118 } 119 120 /** 121 * Get the ZK connection to this peer 122 * @return zk connection 123 */ 124 protected synchronized ZKWatcher getZkw() { 125 return zkw; 126 } 127 128 /** 129 * Closes the current ZKW (if not null) and creates a new one 130 * @throws IOException If anything goes wrong connecting 131 */ 132 synchronized void reloadZkWatcher() throws IOException { 133 if (zkw != null) zkw.close(); 134 zkw = new ZKWatcher(ctx.getConfiguration(), 135 "connection to cluster: " + ctx.getPeerId(), this); 136 getZkw().registerListener(new PeerRegionServerListener(this)); 137 } 138 139 @Override 140 public void abort(String why, Throwable e) { 141 LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId() 142 + " was aborted for the following reason(s):" + why, e); 143 } 144 145 @Override 146 public boolean isAborted() { 147 // Currently this is never "Aborted", we just log when the abort method is called. 148 return false; 149 } 150 151 /** 152 * Get the list of all the region servers from the specified peer 153 * @param zkw zk connection to use 154 * @return list of region server addresses or an empty list if the slave is unavailable 155 */ 156 protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw) 157 throws KeeperException { 158 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.znodePaths.rsZNode); 159 if (children == null) { 160 return Collections.emptyList(); 161 } 162 List<ServerName> addresses = new ArrayList<>(children.size()); 163 for (String child : children) { 164 addresses.add(ServerName.parseServerName(child)); 165 } 166 return addresses; 167 } 168 169 /** 170 * Get a list of all the addresses of all the region servers 171 * for this peer cluster 172 * @return list of addresses 173 */ 174 // Synchronize peer cluster connection attempts to avoid races and rate 175 // limit connections when multiple replication sources try to connect to 176 // the peer cluster. If the peer cluster is down we can get out of control 177 // over time. 178 public synchronized List<ServerName> getRegionServers() { 179 try { 180 setRegionServers(fetchSlavesAddresses(this.getZkw())); 181 } catch (KeeperException ke) { 182 if (LOG.isDebugEnabled()) { 183 LOG.debug("Fetch slaves addresses failed", ke); 184 } 185 reconnect(ke); 186 } 187 return regionServers; 188 } 189 190 /** 191 * Set the list of region servers for that peer 192 * @param regionServers list of addresses for the region servers 193 */ 194 public synchronized void setRegionServers(List<ServerName> regionServers) { 195 this.regionServers = regionServers; 196 lastRegionServerUpdate = System.currentTimeMillis(); 197 } 198 199 /** 200 * Get the timestamp at which the last change occurred to the list of region servers to replicate 201 * to. 202 * @return The System.currentTimeMillis at the last time the list of peer region servers changed. 203 */ 204 public long getLastRegionServerUpdate() { 205 return lastRegionServerUpdate; 206 } 207 208 /** 209 * Tracks changes to the list of region servers in a peer's cluster. 210 */ 211 public static class PeerRegionServerListener extends ZKListener { 212 213 private final HBaseReplicationEndpoint replicationEndpoint; 214 private final String regionServerListNode; 215 216 public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) { 217 super(replicationPeer.getZkw()); 218 this.replicationEndpoint = replicationPeer; 219 this.regionServerListNode = replicationEndpoint.getZkw().znodePaths.rsZNode; 220 } 221 222 @Override 223 public synchronized void nodeChildrenChanged(String path) { 224 if (path.equals(regionServerListNode)) { 225 try { 226 LOG.info("Detected change to peer region servers, fetching updated list"); 227 replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw())); 228 } catch (KeeperException e) { 229 LOG.error("Error reading slave addresses", e); 230 } 231 } 232 } 233 } 234}