001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024import java.util.Random; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.client.ClusterConnection; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 035import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 036import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 039 040/** 041 * Maintains a collection of peers to replicate to, and randomly selects a 042 * single peer to replicate to per set of data to replicate. Also handles 043 * keeping track of peer availability. 044 */ 045@InterfaceAudience.Private 046public class ReplicationSinkManager { 047 048 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class); 049 050 /** 051 * Default maximum number of times a replication sink can be reported as bad before 052 * it will no longer be provided as a sink for replication without the pool of 053 * replication sinks being refreshed. 054 */ 055 static final int DEFAULT_BAD_SINK_THRESHOLD = 3; 056 057 /** 058 * Default ratio of the total number of peer cluster region servers to consider 059 * replicating to. 060 */ 061 static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; 062 063 064 private final Connection conn; 065 066 private final String peerClusterId; 067 068 private final HBaseReplicationEndpoint endpoint; 069 070 // Count of "bad replication sink" reports per peer sink 071 private final Map<ServerName, Integer> badReportCounts; 072 073 // Ratio of total number of potential peer region servers to be used 074 private final float ratio; 075 076 // Maximum number of times a sink can be reported as bad before the pool of 077 // replication sinks is refreshed 078 private final int badSinkThreshold; 079 080 private final Random random; 081 082 // A timestamp of the last time the list of replication peers changed 083 private long lastUpdateToPeers; 084 085 // The current pool of sinks to which replication can be performed 086 private List<ServerName> sinks = Lists.newArrayList(); 087 088 /** 089 * Instantiate for a single replication peer cluster. 090 * @param conn connection to the peer cluster 091 * @param peerClusterId identifier of the peer cluster 092 * @param endpoint replication endpoint for inter cluster replication 093 * @param conf HBase configuration, used for determining replication source ratio and bad peer 094 * threshold 095 */ 096 public ReplicationSinkManager(ClusterConnection conn, String peerClusterId, 097 HBaseReplicationEndpoint endpoint, Configuration conf) { 098 this.conn = conn; 099 this.peerClusterId = peerClusterId; 100 this.endpoint = endpoint; 101 this.badReportCounts = Maps.newHashMap(); 102 this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); 103 this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold", 104 DEFAULT_BAD_SINK_THRESHOLD); 105 this.random = new Random(); 106 } 107 108 /** 109 * Get a randomly-chosen replication sink to replicate to. 110 * 111 * @return a replication sink to replicate to 112 */ 113 public synchronized SinkPeer getReplicationSink() throws IOException { 114 if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) { 115 LOG.info("Current list of sinks is out of date or empty, updating"); 116 chooseSinks(); 117 } 118 119 if (sinks.isEmpty()) { 120 throw new IOException("No replication sinks are available"); 121 } 122 ServerName serverName = sinks.get(random.nextInt(sinks.size())); 123 return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName)); 124 } 125 126 /** 127 * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it 128 * failed). If a single SinkPeer is reported as bad more than 129 * replication.bad.sink.threshold times, it will be removed 130 * from the pool of potential replication targets. 131 * 132 * @param sinkPeer 133 * The SinkPeer that had a failed replication attempt on it 134 */ 135 public synchronized void reportBadSink(SinkPeer sinkPeer) { 136 ServerName serverName = sinkPeer.getServerName(); 137 int badReportCount = (badReportCounts.containsKey(serverName) 138 ? badReportCounts.get(serverName) : 0) + 1; 139 badReportCounts.put(serverName, badReportCount); 140 if (badReportCount > badSinkThreshold) { 141 this.sinks.remove(serverName); 142 if (sinks.isEmpty()) { 143 chooseSinks(); 144 } 145 } 146 } 147 148 /** 149 * Report that a {@code SinkPeer} successfully replicated a chunk of data. 150 * 151 * @param sinkPeer 152 * The SinkPeer that had a failed replication attempt on it 153 */ 154 public synchronized void reportSinkSuccess(SinkPeer sinkPeer) { 155 badReportCounts.remove(sinkPeer.getServerName()); 156 } 157 158 /** 159 * Refresh the list of sinks. 160 */ 161 public synchronized void chooseSinks() { 162 List<ServerName> slaveAddresses = endpoint.getRegionServers(); 163 Collections.shuffle(slaveAddresses, random); 164 int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); 165 sinks = slaveAddresses.subList(0, numSinks); 166 lastUpdateToPeers = System.currentTimeMillis(); 167 badReportCounts.clear(); 168 } 169 170 public synchronized int getNumSinks() { 171 return sinks.size(); 172 } 173 174 @VisibleForTesting 175 protected List<ServerName> getSinksForTesting() { 176 return Collections.unmodifiableList(sinks); 177 } 178 179 /** 180 * Wraps a replication region server sink to provide the ability to identify 181 * it. 182 */ 183 public static class SinkPeer { 184 private ServerName serverName; 185 private AdminService.BlockingInterface regionServer; 186 187 public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) { 188 this.serverName = serverName; 189 this.regionServer = regionServer; 190 } 191 192 ServerName getServerName() { 193 return serverName; 194 } 195 196 public AdminService.BlockingInterface getRegionServer() { 197 return regionServer; 198 } 199 200 } 201 202}