001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.EnumSet; 025import java.util.List; 026import java.util.Map; 027import java.util.UUID; 028import java.util.concurrent.ThreadLocalRandom; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.Abortable; 031import org.apache.hadoop.hbase.ClusterMetrics; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.ServerName; 034import org.apache.hadoop.hbase.client.AsyncClusterConnection; 035import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; 036import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 037import org.apache.hadoop.hbase.security.User; 038import org.apache.hadoop.hbase.util.FutureUtils; 039import org.apache.hadoop.hbase.util.ReservoirSample; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 045 046/** 047 * A {@link BaseReplicationEndpoint} for replication endpoints whose target cluster is an HBase 048 * cluster. 049 */ 050@InterfaceAudience.Private 051public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint 052 implements Abortable { 053 054 private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); 055 056 protected Configuration conf; 057 058 private final Object connLock = new Object(); 059 060 private volatile AsyncClusterConnection conn; 061 062 /** 063 * Default maximum number of times a replication sink can be reported as bad before it will no 064 * longer be provided as a sink for replication without the pool of replication sinks being 065 * refreshed. 066 */ 067 public static final int DEFAULT_BAD_SINK_THRESHOLD = 3; 068 069 /** 070 * Default ratio of the total number of peer cluster region servers to consider replicating to. 071 */ 072 public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; 073 074 // Ratio of total number of potential peer region servers to be used 075 private float ratio; 076 077 // Maximum number of times a sink can be reported as bad before the pool of 078 // replication sinks is refreshed 079 private int badSinkThreshold; 080 // Count of "bad replication sink" reports per peer sink 081 private Map<ServerName, Integer> badReportCounts; 082 083 private List<ServerName> sinkServers = new ArrayList<>(0); 084 085 /* 086 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different 087 * Connection implementations, or initialize it in a different way, so defining createConnection 088 * as protected for possible overridings. 089 */ 090 protected AsyncClusterConnection createConnection(Configuration conf) throws IOException { 091 return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); 092 } 093 094 @Override 095 public void init(Context context) throws IOException { 096 super.init(context); 097 this.conf = HBaseConfiguration.create(ctx.getConfiguration()); 098 this.ratio = 099 ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); 100 this.badSinkThreshold = 101 ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD); 102 this.badReportCounts = Maps.newHashMap(); 103 } 104 105 private void disconnect() { 106 synchronized (connLock) { 107 if (this.conn != null) { 108 try { 109 this.conn.close(); 110 this.conn = null; 111 } catch (IOException e) { 112 LOG.warn("{} Failed to close the connection", ctx.getPeerId()); 113 } 114 } 115 } 116 } 117 118 @Override 119 public void start() { 120 startAsync(); 121 } 122 123 @Override 124 public void stop() { 125 stopAsync(); 126 } 127 128 @Override 129 protected void doStart() { 130 notifyStarted(); 131 } 132 133 @Override 134 protected void doStop() { 135 disconnect(); 136 notifyStopped(); 137 } 138 139 @Override 140 public UUID getPeerUUID() { 141 try { 142 AsyncClusterConnection conn = connect(); 143 String clusterId = FutureUtils 144 .get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID))) 145 .getClusterId(); 146 return UUID.fromString(clusterId); 147 } catch (IOException e) { 148 LOG.warn("Failed to get cluster id for cluster", e); 149 return null; 150 } 151 } 152 153 // do not call this method in doStart method, only initialize the connection to remote cluster 154 // when you actually wants to make use of it. The problem here is that, starting the replication 155 // endpoint is part of the region server initialization work, so if the peer cluster is fully 156 // down and we can not connect to it, we will cause the initialization to fail and crash the 157 // region server, as we need the cluster id while setting up the AsyncClusterConnection, which 158 // needs to at least connect to zookeeper or some other servers in the peer cluster based on 159 // different connection registry implementation 160 private AsyncClusterConnection connect() throws IOException { 161 AsyncClusterConnection c = this.conn; 162 if (c != null) { 163 return c; 164 } 165 synchronized (connLock) { 166 c = this.conn; 167 if (c != null) { 168 return c; 169 } 170 c = createConnection(this.conf); 171 conn = c; 172 } 173 return c; 174 } 175 176 @Override 177 public void abort(String why, Throwable e) { 178 LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId() 179 + " was aborted for the following reason(s):" + why, e); 180 } 181 182 @Override 183 public boolean isAborted() { 184 // Currently this is never "Aborted", we just log when the abort method is called. 185 return false; 186 } 187 188 /** 189 * Get the list of all the region servers from the specified peer 190 * @return list of region server addresses or an empty list if the slave is unavailable 191 */ 192 // will be overrided in tests so protected 193 protected Collection<ServerName> fetchPeerAddresses() { 194 try { 195 return FutureUtils.get(connect().getAdmin().getRegionServers(true)); 196 } catch (IOException e) { 197 LOG.debug("Fetch peer addresses failed", e); 198 return Collections.emptyList(); 199 } 200 } 201 202 protected synchronized void chooseSinks() { 203 Collection<ServerName> slaveAddresses = fetchPeerAddresses(); 204 if (slaveAddresses.isEmpty()) { 205 LOG.warn("No sinks available at peer. Will not be able to replicate"); 206 this.sinkServers = Collections.emptyList(); 207 } else { 208 int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); 209 ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks); 210 sample.add(slaveAddresses.iterator()); 211 this.sinkServers = sample.getSamplingResult(); 212 } 213 badReportCounts.clear(); 214 } 215 216 protected synchronized int getNumSinks() { 217 return sinkServers.size(); 218 } 219 220 /** 221 * Get a randomly-chosen replication sink to replicate to. 222 * @return a replication sink to replicate to 223 */ 224 protected synchronized SinkPeer getReplicationSink() throws IOException { 225 if (sinkServers.isEmpty()) { 226 LOG.info("Current list of sinks is out of date or empty, updating"); 227 chooseSinks(); 228 } 229 if (sinkServers.isEmpty()) { 230 throw new IOException("No replication sinks are available"); 231 } 232 ServerName serverName = 233 sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size())); 234 return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName)); 235 } 236 237 /** 238 * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single 239 * SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed 240 * from the pool of potential replication targets. 241 * @param sinkPeer The SinkPeer that had a failed replication attempt on it 242 */ 243 protected synchronized void reportBadSink(SinkPeer sinkPeer) { 244 ServerName serverName = sinkPeer.getServerName(); 245 int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1); 246 if (badReportCount > badSinkThreshold) { 247 this.sinkServers.remove(serverName); 248 if (sinkServers.isEmpty()) { 249 chooseSinks(); 250 } 251 } 252 } 253 254 /** 255 * Report that a {@code SinkPeer} successfully replicated a chunk of data. The SinkPeer that had a 256 * failed replication attempt on it 257 */ 258 protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) { 259 badReportCounts.remove(sinkPeer.getServerName()); 260 } 261 262 List<ServerName> getSinkServers() { 263 return sinkServers; 264 } 265 266 /** 267 * Wraps a replication region server sink to provide the ability to identify it. 268 */ 269 public static class SinkPeer { 270 private ServerName serverName; 271 private AsyncRegionServerAdmin regionServer; 272 273 public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) { 274 this.serverName = serverName; 275 this.regionServer = regionServer; 276 } 277 278 ServerName getServerName() { 279 return serverName; 280 } 281 282 public AsyncRegionServerAdmin getRegionServer() { 283 return regionServer; 284 } 285 } 286}