001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024import java.util.Random; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.client.ClusterConnection; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 030import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 031import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 032import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Maintains a collection of peers to replicate to, and randomly selects a 039 * single peer to replicate to per set of data to replicate. Also handles 040 * keeping track of peer availability. 041 */ 042@InterfaceAudience.Private 043public class ReplicationSinkManager { 044 045 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class); 046 047 /** 048 * Default maximum number of times a replication sink can be reported as bad before 049 * it will no longer be provided as a sink for replication without the pool of 050 * replication sinks being refreshed. 051 */ 052 static final int DEFAULT_BAD_SINK_THRESHOLD = 3; 053 054 /** 055 * Default ratio of the total number of peer cluster region servers to consider 056 * replicating to. 057 */ 058 static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f; 059 060 061 private final Connection conn; 062 063 private final String peerClusterId; 064 065 private final HBaseReplicationEndpoint endpoint; 066 067 // Count of "bad replication sink" reports per peer sink 068 private final Map<ServerName, Integer> badReportCounts; 069 070 // Ratio of total number of potential peer region servers to be used 071 private final float ratio; 072 073 // Maximum number of times a sink can be reported as bad before the pool of 074 // replication sinks is refreshed 075 private final int badSinkThreshold; 076 077 private final Random random; 078 079 // A timestamp of the last time the list of replication peers changed 080 private long lastUpdateToPeers; 081 082 // The current pool of sinks to which replication can be performed 083 private List<ServerName> sinks = Lists.newArrayList(); 084 085 /** 086 * Instantiate for a single replication peer cluster. 087 * @param conn connection to the peer cluster 088 * @param peerClusterId identifier of the peer cluster 089 * @param endpoint replication endpoint for inter cluster replication 090 * @param conf HBase configuration, used for determining replication source ratio and bad peer 091 * threshold 092 */ 093 public ReplicationSinkManager(ClusterConnection conn, String peerClusterId, 094 HBaseReplicationEndpoint endpoint, Configuration conf) { 095 this.conn = conn; 096 this.peerClusterId = peerClusterId; 097 this.endpoint = endpoint; 098 this.badReportCounts = Maps.newHashMap(); 099 this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); 100 this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold", 101 DEFAULT_BAD_SINK_THRESHOLD); 102 this.random = new Random(); 103 } 104 105 /** 106 * Get a randomly-chosen replication sink to replicate to. 107 * 108 * @return a replication sink to replicate to 109 */ 110 public synchronized SinkPeer getReplicationSink() throws IOException { 111 if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) { 112 LOG.info("Current list of sinks is out of date or empty, updating"); 113 chooseSinks(); 114 } 115 116 if (sinks.isEmpty()) { 117 throw new IOException("No replication sinks are available"); 118 } 119 ServerName serverName = sinks.get(random.nextInt(sinks.size())); 120 return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName)); 121 } 122 123 /** 124 * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it 125 * failed). If a single SinkPeer is reported as bad more than 126 * replication.bad.sink.threshold times, it will be removed 127 * from the pool of potential replication targets. 128 * 129 * @param sinkPeer 130 * The SinkPeer that had a failed replication attempt on it 131 */ 132 public synchronized void reportBadSink(SinkPeer sinkPeer) { 133 ServerName serverName = sinkPeer.getServerName(); 134 int badReportCount = (badReportCounts.containsKey(serverName) 135 ? badReportCounts.get(serverName) : 0) + 1; 136 badReportCounts.put(serverName, badReportCount); 137 if (badReportCount > badSinkThreshold) { 138 this.sinks.remove(serverName); 139 if (sinks.isEmpty()) { 140 chooseSinks(); 141 } 142 } 143 } 144 145 /** 146 * Report that a {@code SinkPeer} successfully replicated a chunk of data. 147 * 148 * @param sinkPeer 149 * The SinkPeer that had a failed replication attempt on it 150 */ 151 public synchronized void reportSinkSuccess(SinkPeer sinkPeer) { 152 badReportCounts.remove(sinkPeer.getServerName()); 153 } 154 155 /** 156 * Refresh the list of sinks. 157 */ 158 public synchronized void chooseSinks() { 159 List<ServerName> slaveAddresses = endpoint.getRegionServers(); 160 if(slaveAddresses.isEmpty()){ 161 LOG.warn("No sinks available at peer. Will not be able to replicate"); 162 } 163 Collections.shuffle(slaveAddresses, random); 164 int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio); 165 sinks = slaveAddresses.subList(0, numSinks); 166 lastUpdateToPeers = System.currentTimeMillis(); 167 badReportCounts.clear(); 168 } 169 170 public synchronized int getNumSinks() { 171 return sinks.size(); 172 } 173 174 protected List<ServerName> getSinksForTesting() { 175 return Collections.unmodifiableList(sinks); 176 } 177 178 /** 179 * Wraps a replication region server sink to provide the ability to identify 180 * it. 181 */ 182 public static class SinkPeer { 183 private ServerName serverName; 184 private AdminService.BlockingInterface regionServer; 185 186 public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) { 187 this.serverName = serverName; 188 this.regionServer = regionServer; 189 } 190 191 ServerName getServerName() { 192 return serverName; 193 } 194 195 public AdminService.BlockingInterface getRegionServer() { 196 return regionServer; 197 } 198 199 } 200 201}