View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.replication.regionserver;
19  
20  import java.io.IOException;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Random;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.ServerName;
30  import org.apache.hadoop.hbase.client.HConnection;
31  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
32  import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
33  import com.google.common.collect.Lists;
34  import com.google.common.collect.Maps;
35  
36  /**
37   * Maintains a collection of peers to replicate to, and randomly selects a
38   * single peer to replicate to per set of data to replicate. Also handles
39   * keeping track of peer availability.
40   */
41  public class ReplicationSinkManager {
42  
43    private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class);
44  
45    /**
46     * Default maximum number of times a replication sink can be reported as bad before
47     * it will no longer be provided as a sink for replication without the pool of
48     * replication sinks being refreshed.
49     */
50    static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
51  
52    /**
53     * Default ratio of the total number of peer cluster region servers to consider
54     * replicating to.
55     */
56    static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f;
57  
58  
59    private final HConnection conn;
60  
61    private final String peerClusterId;
62  
63    private final HBaseReplicationEndpoint endpoint;
64  
65    // Count of "bad replication sink" reports per peer sink
66    private final Map<ServerName, Integer> badReportCounts;
67  
68    // Ratio of total number of potential peer region servers to be used
69    private final float ratio;
70  
71    // Maximum number of times a sink can be reported as bad before the pool of
72    // replication sinks is refreshed
73    private final int badSinkThreshold;
74  
75    private final Random random;
76  
77    // A timestamp of the last time the list of replication peers changed
78    private long lastUpdateToPeers;
79  
80    // The current pool of sinks to which replication can be performed
81    private List<ServerName> sinks = Lists.newArrayList();
82  
83    /**
84     * Instantiate for a single replication peer cluster.
85     * @param conn connection to the peer cluster
86     * @param peerClusterId identifier of the peer cluster
87     * @param endpoint replication endpoint for inter cluster replication
88     * @param conf HBase configuration, used for determining replication source ratio and bad peer
89     *          threshold
90     */
91    public ReplicationSinkManager(HConnection conn, String peerClusterId,
92        HBaseReplicationEndpoint endpoint, Configuration conf) {
93      this.conn = conn;
94      this.peerClusterId = peerClusterId;
95      this.endpoint = endpoint;
96      this.badReportCounts = Maps.newHashMap();
97      this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
98      this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
99                                          DEFAULT_BAD_SINK_THRESHOLD);
100     this.random = new Random();
101   }
102 
103   /**
104    * Get a randomly-chosen replication sink to replicate to.
105    *
106    * @return a replication sink to replicate to
107    */
108   public SinkPeer getReplicationSink() throws IOException {
109     if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
110       LOG.info("Current list of sinks is out of date or empty, updating");
111       chooseSinks();
112     }
113 
114     if (sinks.isEmpty()) {
115       throw new IOException("No replication sinks are available");
116     }
117     ServerName serverName = sinks.get(random.nextInt(sinks.size()));
118     return new SinkPeer(serverName, conn.getAdmin(serverName));
119   }
120 
121   /**
122    * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
123    * failed). If a single SinkPeer is reported as bad more than
124    * replication.bad.sink.threshold times, it will be removed
125    * from the pool of potential replication targets.
126    *
127    * @param sinkPeer
128    *          The SinkPeer that had a failed replication attempt on it
129    */
130   public void reportBadSink(SinkPeer sinkPeer) {
131     ServerName serverName = sinkPeer.getServerName();
132     int badReportCount = (badReportCounts.containsKey(serverName)
133                     ? badReportCounts.get(serverName) : 0) + 1;
134     badReportCounts.put(serverName, badReportCount);
135     if (badReportCount > badSinkThreshold) {
136       this.sinks.remove(serverName);
137       if (sinks.isEmpty()) {
138         chooseSinks();
139       }
140     }
141   }
142 
143   /**
144    * Report that a {@code SinkPeer} successfully replicated a chunk of data.
145    *
146    * @param sinkPeer
147    *          The SinkPeer that had a failed replication attempt on it
148    */
149   public void reportSinkSuccess(SinkPeer sinkPeer) {
150     badReportCounts.remove(sinkPeer.getServerName());
151   }
152 
153   void chooseSinks() {
154     List<ServerName> slaveAddresses = endpoint.getRegionServers();
155     Collections.shuffle(slaveAddresses, random);
156     int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
157     sinks = slaveAddresses.subList(0, numSinks);
158     lastUpdateToPeers = System.currentTimeMillis();
159     badReportCounts.clear();
160   }
161 
162   List<ServerName> getSinks() {
163     return sinks;
164   }
165 
166   /**
167    * Wraps a replication region server sink to provide the ability to identify
168    * it.
169    */
170   public static class SinkPeer {
171     private ServerName serverName;
172     private AdminService.BlockingInterface regionServer;
173 
174     public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
175       this.serverName = serverName;
176       this.regionServer = regionServer;
177     }
178 
179     ServerName getServerName() {
180       return serverName;
181     }
182 
183     public AdminService.BlockingInterface getRegionServer() {
184       return regionServer;
185     }
186 
187   }
188 
189 }