001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025import java.util.UUID; 026import java.util.concurrent.ThreadLocalRandom; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.Abortable; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.client.AsyncClusterConnection; 032import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; 033import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 034import org.apache.hadoop.hbase.security.User; 035import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 036import org.apache.hadoop.hbase.zookeeper.ZKListener; 037import org.apache.hadoop.hbase.zookeeper.ZKUtil; 038import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.zookeeper.KeeperException; 041import org.apache.zookeeper.KeeperException.AuthFailedException; 042import org.apache.zookeeper.KeeperException.ConnectionLossException; 043import org.apache.zookeeper.KeeperException.SessionExpiredException; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 048 049/** 050 * A {@link BaseReplicationEndpoint} for replication endpoints whose target cluster is an HBase 051 * cluster. 052 */ 053@InterfaceAudience.Private 054public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint 055 implements Abortable { 056 057 private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); 058 059 private ZKWatcher zkw = null; 060 private final Object zkwLock = new Object(); 061 062 protected Configuration conf; 063 064 private AsyncClusterConnection conn; 065 066 /** 067 * Default maximum number of times a replication sink can be reported as bad before it will no 068 * longer be provided as a sink for replication without the pool of replication sinks being 069 * refreshed. 070 */ 071 public static final int DEFAULT_BAD_SINK_THRESHOLD = 3; 072 073 /** 074 * Default ratio of the total number of peer cluster region servers to consider replicating to. 075 */ 076 public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; 077 078 // Ratio of total number of potential peer region servers to be used 079 private float ratio; 080 081 // Maximum number of times a sink can be reported as bad before the pool of 082 // replication sinks is refreshed 083 private int badSinkThreshold; 084 // Count of "bad replication sink" reports per peer sink 085 private Map<ServerName, Integer> badReportCounts; 086 087 private List<ServerName> sinkServers = new ArrayList<>(0); 088 089 /* 090 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different 091 * Connection implementations, or initialize it in a different way, so defining createConnection 092 * as protected for possible overridings. 093 */ 094 protected AsyncClusterConnection createConnection(Configuration conf) throws IOException { 095 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); 096 } 097 098 @Override 099 public void init(Context context) throws IOException { 100 super.init(context); 101 this.conf = HBaseConfiguration.create(ctx.getConfiguration()); 102 this.ratio = 103 ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); 104 this.badSinkThreshold = 105 ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); 106 this.badReportCounts = Maps.newHashMap(); 107 } 108 109 protected void disconnect() { 110 synchronized (zkwLock) { 111 if (zkw != null) { 112 zkw.close(); 113 } 114 } 115 if (this.conn != null) { 116 try { 117 this.conn.close(); 118 this.conn = null; 119 } catch (IOException e) { 120 LOG.warn("{} Failed to close the connection", ctx.getPeerId()); 121 } 122 } 123 } 124 125 /** 126 * A private method used to re-establish a zookeeper session with a peer cluster. 127 */ 128 private void reconnect(KeeperException ke) { 129 if ( 130 ke instanceof ConnectionLossException || ke instanceof SessionExpiredException 131 || ke instanceof AuthFailedException 132 ) { 133 String clusterKey = ctx.getPeerConfig().getClusterKey(); 134 LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke); 135 try { 136 reloadZkWatcher(); 137 } catch (IOException io) { 138 LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io); 139 } 140 } 141 } 142 143 @Override 144 public void start() { 145 startAsync(); 146 } 147 148 @Override 149 public void stop() { 150 stopAsync(); 151 } 152 153 @Override 154 protected void doStart() { 155 try { 156 reloadZkWatcher(); 157 connectPeerCluster(); 158 notifyStarted(); 159 } catch (IOException e) { 160 notifyFailed(e); 161 } 162 } 163 164 @Override 165 protected void doStop() { 166 disconnect(); 167 notifyStopped(); 168 } 169 170 @Override 171 // Synchronize peer cluster connection attempts to avoid races and rate 172 // limit connections when multiple replication sources try to connect to 173 // the peer cluster. If the peer cluster is down we can get out of control 174 // over time. 175 public UUID getPeerUUID() { 176 UUID peerUUID = null; 177 try { 178 synchronized (zkwLock) { 179 peerUUID = ZKClusterId.getUUIDForCluster(zkw); 180 } 181 } catch (KeeperException ke) { 182 reconnect(ke); 183 } 184 return peerUUID; 185 } 186 187 /** 188 * Closes the current ZKW (if not null) and creates a new one 189 * @throws IOException If anything goes wrong connecting 190 */ 191 private void reloadZkWatcher() throws IOException { 192 synchronized (zkwLock) { 193 if (zkw != null) { 194 zkw.close(); 195 } 196 zkw = 197 new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this); 198 zkw.registerListener(new PeerRegionServerListener(this)); 199 } 200 } 201 202 private void connectPeerCluster() throws IOException { 203 try { 204 conn = createConnection(this.conf); 205 } catch (IOException ioe) { 206 LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe); 207 throw ioe; 208 } 209 } 210 211 @Override 212 public void abort(String why, Throwable e) { 213 LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId() 214 + " was aborted for the following reason(s):" + why, e); 215 } 216 217 @Override 218 public boolean isAborted() { 219 // Currently this is never "Aborted", we just log when the abort method is called. 220 return false; 221 } 222 223 /** 224 * Get the list of all the region servers from the specified peer 225 * @return list of region server addresses or an empty list if the slave is unavailable 226 */ 227 protected List<ServerName> fetchSlavesAddresses() { 228 List<String> children = null; 229 try { 230 synchronized (zkwLock) { 231 children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode); 232 } 233 } catch (KeeperException ke) { 234 if (LOG.isDebugEnabled()) { 235 LOG.debug("Fetch slaves addresses failed", ke); 236 } 237 reconnect(ke); 238 } 239 if (children == null) { 240 return Collections.emptyList(); 241 } 242 List<ServerName> addresses = new ArrayList<>(children.size()); 243 for (String child : children) { 244 addresses.add(ServerName.parseServerName(child)); 245 } 246 return addresses; 247 } 248 249 protected synchronized void chooseSinks() { 250 List<ServerName> slaveAddresses = fetchSlavesAddresses(); 251 if (slaveAddresses.isEmpty()) { 252 LOG.warn("No sinks available at peer. Will not be able to replicate"); 253 } 254 Collections.shuffle(slaveAddresses, ThreadLocalRandom.current()); 255 int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); 256 this.sinkServers = slaveAddresses.subList(0, numSinks); 257 badReportCounts.clear(); 258 } 259 260 protected synchronized int getNumSinks() { 261 return sinkServers.size(); 262 } 263 264 /** 265 * Get a randomly-chosen replication sink to replicate to. 266 * @return a replication sink to replicate to 267 */ 268 protected synchronized SinkPeer getReplicationSink() throws IOException { 269 if (sinkServers.isEmpty()) { 270 LOG.info("Current list of sinks is out of date or empty, updating"); 271 chooseSinks(); 272 } 273 if (sinkServers.isEmpty()) { 274 throw new IOException("No replication sinks are available"); 275 } 276 ServerName serverName = 277 sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); 278 return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName)); 279 } 280 281 /** 282 * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single 283 * SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed 284 * from the pool of potential replication targets. 285 * @param sinkPeer The SinkPeer that had a failed replication attempt on it 286 */ 287 protected synchronized void reportBadSink(SinkPeer sinkPeer) { 288 ServerName serverName = sinkPeer.getServerName(); 289 int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1); 290 if (badReportCount > badSinkThreshold) { 291 this.sinkServers.remove(serverName); 292 if (sinkServers.isEmpty()) { 293 chooseSinks(); 294 } 295 } 296 } 297 298 /** 299 * Report that a {@code SinkPeer} successfully replicated a chunk of data. The SinkPeer that had a 300 * failed replication attempt on it 301 */ 302 protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) { 303 badReportCounts.remove(sinkPeer.getServerName()); 304 } 305 306 List<ServerName> getSinkServers() { 307 return sinkServers; 308 } 309 310 /** 311 * Tracks changes to the list of region servers in a peer's cluster. 312 */ 313 public static class PeerRegionServerListener extends ZKListener { 314 315 private final HBaseReplicationEndpoint replicationEndpoint; 316 private final String regionServerListNode; 317 318 public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) { 319 super(endpoint.zkw); 320 this.replicationEndpoint = endpoint; 321 this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode; 322 } 323 324 @Override 325 public synchronized void nodeChildrenChanged(String path) { 326 if (path.equals(regionServerListNode)) { 327 LOG.info("Detected change to peer region servers, fetching updated list"); 328 replicationEndpoint.chooseSinks(); 329 } 330 } 331 } 332 333 /** 334 * Wraps a replication region server sink to provide the ability to identify it. 335 */ 336 public static class SinkPeer { 337 private ServerName serverName; 338 private AsyncRegionServerAdmin regionServer; 339 340 public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { 341 this.serverName = serverName; 342 this.regionServer = regionServer; 343 } 344 345 ServerName getServerName() { 346 return serverName; 347 } 348 349 public AsyncRegionServerAdmin getRegionServer() { 350 return regionServer; 351 } 352 } 353}