001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.net.URI; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.EnumSet; 026import java.util.List; 027import java.util.Map; 028import java.util.UUID; 029import java.util.concurrent.ThreadLocalRandom; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Abortable; 032import org.apache.hadoop.hbase.ClusterMetrics; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.client.AsyncClusterConnection; 036import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; 037import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 038import org.apache.hadoop.hbase.client.ConnectionRegistryFactory; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.util.FutureUtils; 041import org.apache.hadoop.hbase.util.ReservoirSample; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 047 048/** 049 * A {@link BaseReplicationEndpoint} for replication endpoints whose target cluster is an HBase 050 * cluster. 051 */ 052@InterfaceAudience.Private 053public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint 054 implements Abortable { 055 056 private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); 057 058 protected Configuration conf; 059 060 private URI clusterURI; 061 062 private final Object connLock = new Object(); 063 064 private volatile AsyncClusterConnection conn; 065 066 /** 067 * Default maximum number of times a replication sink can be reported as bad before it will no 068 * longer be provided as a sink for replication without the pool of replication sinks being 069 * refreshed. 070 */ 071 public static final int DEFAULT_BAD_SINK_THRESHOLD = 3; 072 073 /** 074 * Default ratio of the total number of peer cluster region servers to consider replicating to. 075 */ 076 public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; 077 078 // Ratio of total number of potential peer region servers to be used 079 private float ratio; 080 081 // Maximum number of times a sink can be reported as bad before the pool of 082 // replication sinks is refreshed 083 private int badSinkThreshold; 084 // Count of "bad replication sink" reports per peer sink 085 private Map<ServerName, Integer> badReportCounts; 086 087 private List<ServerName> sinkServers = new ArrayList<>(0); 088 089 /** 090 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different 091 * Connection implementations, or initialize it in a different way, so defining createConnection 092 * as protected for possible overridings. 093 */ 094 protected AsyncClusterConnection createConnection(URI clusterURI, Configuration conf) 095 throws IOException { 096 return ClusterConnectionFactory.createAsyncClusterConnection(clusterURI, conf, null, 097 User.getCurrent()); 098 } 099 100 @Override 101 public void init(Context context) throws IOException { 102 super.init(context); 103 this.conf = HBaseConfiguration.create(ctx.getConfiguration()); 104 this.clusterURI = ConnectionRegistryFactory 105 .tryParseAsConnectionURI(context.getReplicationPeer().getPeerConfig().getClusterKey()); 106 this.ratio = 107 ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); 108 this.badSinkThreshold = 109 ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); 110 this.badReportCounts = Maps.newHashMap(); 111 } 112 113 private void disconnect() { 114 synchronized (connLock) { 115 if (this.conn != null) { 116 try { 117 this.conn.close(); 118 this.conn = null; 119 } catch (IOException e) { 120 LOG.warn("{} Failed to close the connection", ctx.getPeerId()); 121 } 122 } 123 } 124 } 125 126 @Override 127 public void start() { 128 startAsync(); 129 } 130 131 @Override 132 public void stop() { 133 stopAsync(); 134 } 135 136 @Override 137 protected void doStart() { 138 notifyStarted(); 139 } 140 141 @Override 142 protected void doStop() { 143 disconnect(); 144 notifyStopped(); 145 } 146 147 @Override 148 public UUID getPeerUUID() { 149 try { 150 AsyncClusterConnection conn = connect(); 151 String clusterId = FutureUtils 152 .get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID))) 153 .getClusterId(); 154 return UUID.fromString(clusterId); 155 } catch (IOException e) { 156 LOG.warn("Failed to get cluster id for cluster", e); 157 return null; 158 } 159 } 160 161 // do not call this method in doStart method, only initialize the connection to remote cluster 162 // when you actually wants to make use of it. The problem here is that, starting the replication 163 // endpoint is part of the region server initialization work, so if the peer cluster is fully 164 // down and we can not connect to it, we will cause the initialization to fail and crash the 165 // region server, as we need the cluster id while setting up the AsyncClusterConnection, which 166 // needs to at least connect to zookeeper or some other servers in the peer cluster based on 167 // different connection registry implementation 168 private AsyncClusterConnection connect() throws IOException { 169 AsyncClusterConnection c = this.conn; 170 if (c != null) { 171 return c; 172 } 173 synchronized (connLock) { 174 c = this.conn; 175 if (c != null) { 176 return c; 177 } 178 c = createConnection(clusterURI, conf); 179 conn = c; 180 } 181 return c; 182 } 183 184 @Override 185 public void abort(String why, Throwable e) { 186 LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId() 187 + " was aborted for the following reason(s):" + why, e); 188 } 189 190 @Override 191 public boolean isAborted() { 192 // Currently this is never "Aborted", we just log when the abort method is called. 193 return false; 194 } 195 196 /** 197 * Get the list of all the region servers from the specified peer 198 * @return list of region server addresses or an empty list if the slave is unavailable 199 */ 200 // will be overrided in tests so protected 201 protected Collection<ServerName> fetchPeerAddresses() { 202 try { 203 return FutureUtils.get(connect().getAdmin().getRegionServers(true)); 204 } catch (IOException e) { 205 LOG.debug("Fetch peer addresses failed", e); 206 return Collections.emptyList(); 207 } 208 } 209 210 protected synchronized void chooseSinks() { 211 Collection<ServerName> slaveAddresses = fetchPeerAddresses(); 212 if (slaveAddresses.isEmpty()) { 213 LOG.warn("No sinks available at peer. Will not be able to replicate"); 214 this.sinkServers = Collections.emptyList(); 215 } else { 216 int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); 217 ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks); 218 sample.add(slaveAddresses.iterator()); 219 this.sinkServers = sample.getSamplingResult(); 220 } 221 badReportCounts.clear(); 222 } 223 224 protected synchronized int getNumSinks() { 225 return sinkServers.size(); 226 } 227 228 /** 229 * Get a randomly-chosen replication sink to replicate to. 230 * @return a replication sink to replicate to 231 */ 232 protected synchronized SinkPeer getReplicationSink() throws IOException { 233 if (sinkServers.isEmpty()) { 234 LOG.info("Current list of sinks is out of date or empty, updating"); 235 chooseSinks(); 236 } 237 if (sinkServers.isEmpty()) { 238 throw new IOException("No replication sinks are available"); 239 } 240 ServerName serverName = 241 sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); 242 return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName)); 243 } 244 245 /** 246 * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single 247 * SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed 248 * from the pool of potential replication targets. 249 * @param sinkPeer The SinkPeer that had a failed replication attempt on it 250 */ 251 protected synchronized void reportBadSink(SinkPeer sinkPeer) { 252 ServerName serverName = sinkPeer.getServerName(); 253 int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1); 254 if (badReportCount > badSinkThreshold) { 255 this.sinkServers.remove(serverName); 256 if (sinkServers.isEmpty()) { 257 chooseSinks(); 258 } 259 } 260 } 261 262 /** 263 * Report that a {@code SinkPeer} successfully replicated a chunk of data. The SinkPeer that had a 264 * failed replication attempt on it 265 */ 266 protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) { 267 badReportCounts.remove(sinkPeer.getServerName()); 268 } 269 270 List<ServerName> getSinkServers() { 271 return sinkServers; 272 } 273 274 /** 275 * Wraps a replication region server sink to provide the ability to identify it. 276 */ 277 public static class SinkPeer { 278 private ServerName serverName; 279 private AsyncRegionServerAdmin regionServer; 280 281 public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { 282 this.serverName = serverName; 283 this.regionServer = regionServer; 284 } 285 286 ServerName getServerName() { 287 return serverName; 288 } 289 290 public AsyncRegionServerAdmin getRegionServer() { 291 return regionServer; 292 } 293 } 294}