001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.UUID; 025import org.apache.hadoop.hbase.Abortable; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 029import org.apache.hadoop.hbase.zookeeper.ZKListener; 030import org.apache.hadoop.hbase.zookeeper.ZKUtil; 031import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.apache.zookeeper.KeeperException; 034import org.apache.zookeeper.KeeperException.AuthFailedException; 035import org.apache.zookeeper.KeeperException.ConnectionLossException; 036import org.apache.zookeeper.KeeperException.SessionExpiredException; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A {@link BaseReplicationEndpoint} for replication endpoints whose target cluster is an HBase 042 * cluster. 043 */ 044@InterfaceAudience.Private 045public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint 046 implements Abortable { 047 048 private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); 049 050 private ZKWatcher zkw = null; 051 052 private List<ServerName> regionServers = new ArrayList<>(0); 053 private long lastRegionServerUpdate; 054 055 protected synchronized void disconnect() { 056 if (zkw != null) { 057 zkw.close(); 058 } 059 } 060 061 /** 062 * A private method used to re-establish a zookeeper session with a peer cluster. n 063 */ 064 protected void reconnect(KeeperException ke) { 065 if ( 066 ke instanceof ConnectionLossException || ke instanceof SessionExpiredException 067 || ke instanceof AuthFailedException 068 ) { 069 String clusterKey = ctx.getPeerConfig().getClusterKey(); 070 LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke); 071 try { 072 reloadZkWatcher(); 073 } catch (IOException io) { 074 LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io); 075 } 076 } 077 } 078 079 @Override 080 public void start() { 081 startAsync(); 082 } 083 084 @Override 085 public void stop() { 086 stopAsync(); 087 } 088 089 @Override 090 protected void doStart() { 091 try { 092 reloadZkWatcher(); 093 notifyStarted(); 094 } catch (IOException e) { 095 notifyFailed(e); 096 } 097 } 098 099 @Override 100 protected void doStop() { 101 disconnect(); 102 notifyStopped(); 103 } 104 105 @Override 106 // Synchronize peer cluster connection attempts to avoid races and rate 107 // limit connections when multiple replication sources try to connect to 108 // the peer cluster. If the peer cluster is down we can get out of control 109 // over time. 110 public synchronized UUID getPeerUUID() { 111 UUID peerUUID = null; 112 try { 113 peerUUID = ZKClusterId.getUUIDForCluster(zkw); 114 } catch (KeeperException ke) { 115 reconnect(ke); 116 } 117 return peerUUID; 118 } 119 120 /** 121 * Get the ZK connection to this peer 122 * @return zk connection 123 */ 124 protected synchronized ZKWatcher getZkw() { 125 return zkw; 126 } 127 128 /** 129 * Closes the current ZKW (if not null) and creates a new one 130 * @throws IOException If anything goes wrong connecting 131 */ 132 synchronized void reloadZkWatcher() throws IOException { 133 if (zkw != null) zkw.close(); 134 zkw = new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); 135 getZkw().registerListener(new PeerRegionServerListener(this)); 136 } 137 138 @Override 139 public void abort(String why, Throwable e) { 140 LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId() 141 + " was aborted for the following reason(s):" + why, e); 142 } 143 144 @Override 145 public boolean isAborted() { 146 // Currently this is never "Aborted", we just log when the abort method is called. 147 return false; 148 } 149 150 /** 151 * Get the list of all the region servers from the specified peer 152 * @param zkw zk connection to use 153 * @return list of region server addresses or an empty list if the slave is unavailable 154 */ 155 protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw) throws KeeperException { 156 List<String> children = 157 ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode); 158 if (children == null) { 159 return Collections.emptyList(); 160 } 161 List<ServerName> addresses = new ArrayList<>(children.size()); 162 for (String child : children) { 163 addresses.add(ServerName.parseServerName(child)); 164 } 165 return addresses; 166 } 167 168 /** 169 * Get a list of all the addresses of all the available region servers for this peer cluster, or 170 * an empty list if no region servers available at peer cluster. 171 * @return list of addresses 172 */ 173 // Synchronize peer cluster connection attempts to avoid races and rate 174 // limit connections when multiple replication sources try to connect to 175 // the peer cluster. If the peer cluster is down we can get out of control 176 // over time. 177 public synchronized List<ServerName> getRegionServers() { 178 try { 179 setRegionServers(fetchSlavesAddresses(this.getZkw())); 180 } catch (KeeperException ke) { 181 if (LOG.isDebugEnabled()) { 182 LOG.debug("Fetch slaves addresses failed", ke); 183 } 184 reconnect(ke); 185 } 186 return regionServers; 187 } 188 189 /** 190 * Set the list of region servers for that peer 191 * @param regionServers list of addresses for the region servers 192 */ 193 public synchronized void setRegionServers(List<ServerName> regionServers) { 194 this.regionServers = regionServers; 195 lastRegionServerUpdate = EnvironmentEdgeManager.currentTime(); 196 } 197 198 /** 199 * Get the timestamp at which the last change occurred to the list of region servers to replicate 200 * to. 201 * @return The last time the list of peer region servers changed. 202 */ 203 public long getLastRegionServerUpdate() { 204 return lastRegionServerUpdate; 205 } 206 207 /** 208 * Tracks changes to the list of region servers in a peer's cluster. 209 */ 210 public static class PeerRegionServerListener extends ZKListener { 211 212 private final HBaseReplicationEndpoint replicationEndpoint; 213 private final String regionServerListNode; 214 215 public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) { 216 super(replicationPeer.getZkw()); 217 this.replicationEndpoint = replicationPeer; 218 this.regionServerListNode = replicationEndpoint.getZkw().getZNodePaths().rsZNode; 219 } 220 221 @Override 222 public synchronized void nodeChildrenChanged(String path) { 223 if (path.equals(regionServerListNode)) { 224 try { 225 LOG.info("Detected change to peer region servers, fetching updated list"); 226 replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw())); 227 } catch (KeeperException e) { 228 LOG.error("Error reading slave addresses", e); 229 } 230 } 231 } 232 } 233}