View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Random;
25  
26  import com.google.common.annotations.VisibleForTesting;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.ServerName;
31  import org.apache.hadoop.hbase.client.HConnection;
32  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
33  import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
34  import com.google.common.collect.Lists;
35  import com.google.common.collect.Maps;
36  
37  /**
38   * Maintains a collection of peers to replicate to, and randomly selects a
39   * single peer to replicate to per set of data to replicate. Also handles
40   * keeping track of peer availability.
41   */
42  public class ReplicationSinkManager {
43  
44    private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class);
45  
46    /**
47     * Default maximum number of times a replication sink can be reported as bad before
48     * it will no longer be provided as a sink for replication without the pool of
49     * replication sinks being refreshed.
50     */
51    static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
52  
53    /**
54     * Default ratio of the total number of peer cluster region servers to consider
55     * replicating to.
56     */
57    static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f;
58  
59  
60    private final HConnection conn;
61  
62    private final String peerClusterId;
63  
64    private final HBaseReplicationEndpoint endpoint;
65  
66    // Count of "bad replication sink" reports per peer sink
67    private final Map<ServerName, Integer> badReportCounts;
68  
69    // Ratio of total number of potential peer region servers to be used
70    private final float ratio;
71  
72    // Maximum number of times a sink can be reported as bad before the pool of
73    // replication sinks is refreshed
74    private final int badSinkThreshold;
75  
76    private final Random random;
77  
78    // A timestamp of the last time the list of replication peers changed
79    private long lastUpdateToPeers;
80  
81    // The current pool of sinks to which replication can be performed
82    private List<ServerName> sinks = Lists.newArrayList();
83  
84    /**
85     * Instantiate for a single replication peer cluster.
86     * @param conn connection to the peer cluster
87     * @param peerClusterId identifier of the peer cluster
88     * @param endpoint replication endpoint for inter cluster replication
89     * @param conf HBase configuration, used for determining replication source ratio and bad peer
90     *          threshold
91     */
92    public ReplicationSinkManager(HConnection conn, String peerClusterId,
93        HBaseReplicationEndpoint endpoint, Configuration conf) {
94      this.conn = conn;
95      this.peerClusterId = peerClusterId;
96      this.endpoint = endpoint;
97      this.badReportCounts = Maps.newHashMap();
98      this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
99      this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
100                                         DEFAULT_BAD_SINK_THRESHOLD);
101     this.random = new Random();
102   }
103 
104   /**
105    * Get a randomly-chosen replication sink to replicate to.
106    *
107    * @return a replication sink to replicate to
108    */
109   public synchronized SinkPeer getReplicationSink() throws IOException {
110     if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
111       LOG.info("Current list of sinks is out of date or empty, updating");
112       chooseSinks();
113     }
114 
115     if (sinks.isEmpty()) {
116       throw new IOException("No replication sinks are available");
117     }
118     ServerName serverName = sinks.get(random.nextInt(sinks.size()));
119     return new SinkPeer(serverName, conn.getAdmin(serverName));
120   }
121 
122   /**
123    * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
124    * failed). If a single SinkPeer is reported as bad more than
125    * replication.bad.sink.threshold times, it will be removed
126    * from the pool of potential replication targets.
127    *
128    * @param sinkPeer
129    *          The SinkPeer that had a failed replication attempt on it
130    */
131   public synchronized void reportBadSink(SinkPeer sinkPeer) {
132     ServerName serverName = sinkPeer.getServerName();
133     int badReportCount = (badReportCounts.containsKey(serverName)
134                     ? badReportCounts.get(serverName) : 0) + 1;
135     badReportCounts.put(serverName, badReportCount);
136     if (badReportCount > badSinkThreshold) {
137       this.sinks.remove(serverName);
138       if (sinks.isEmpty()) {
139         chooseSinks();
140       }
141     }
142   }
143 
144   /**
145    * Report that a {@code SinkPeer} successfully replicated a chunk of data.
146    *
147    * @param sinkPeer
148    *          The SinkPeer that had a failed replication attempt on it
149    */
150   public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
151     badReportCounts.remove(sinkPeer.getServerName());
152   }
153 
154   /**
155    * Refresh the list of sinks.
156    */
157   public synchronized void chooseSinks() {
158     List<ServerName> slaveAddresses = endpoint.getRegionServers();
159     Collections.shuffle(slaveAddresses, random);
160     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
161     sinks = slaveAddresses.subList(0, numSinks);
162     lastUpdateToPeers = System.currentTimeMillis();
163     badReportCounts.clear();
164   }
165 
166   public synchronized int getNumSinks() {
167     return sinks.size();
168   }
169 
170   @VisibleForTesting
171   protected List<ServerName> getSinksForTesting() {
172     return Collections.unmodifiableList(sinks);
173   }
174 
175   /**
176    * Wraps a replication region server sink to provide the ability to identify
177    * it.
178    */
179   public static class SinkPeer {
180     private ServerName serverName;
181     private AdminService.BlockingInterface regionServer;
182 
183     public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
184       this.serverName = serverName;
185       this.regionServer = regionServer;
186     }
187 
188     ServerName getServerName() {
189       return serverName;
190     }
191 
192     public AdminService.BlockingInterface getRegionServer() {
193       return regionServer;
194     }
195 
196   }
197 
198 }