001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024import java.util.Random;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.client.ClusterConnection;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
030import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
031import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
032import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Maintains a collection of peers to replicate to, and randomly selects a
039 * single peer to replicate to per set of data to replicate. Also handles
040 * keeping track of peer availability.
041 */
042@InterfaceAudience.Private
043public class ReplicationSinkManager {
044
045  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class);
046
047  /**
048   * Default maximum number of times a replication sink can be reported as bad before
049   * it will no longer be provided as a sink for replication without the pool of
050   * replication sinks being refreshed.
051   */
052  static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
053
054  /**
055   * Default ratio of the total number of peer cluster region servers to consider
056   * replicating to.
057   */
058  static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
059
060
061  private final Connection conn;
062
063  private final String peerClusterId;
064
065  private final HBaseReplicationEndpoint endpoint;
066
067  // Count of "bad replication sink" reports per peer sink
068  private final Map<ServerName, Integer> badReportCounts;
069
070  // Ratio of total number of potential peer region servers to be used
071  private final float ratio;
072
073  // Maximum number of times a sink can be reported as bad before the pool of
074  // replication sinks is refreshed
075  private final int badSinkThreshold;
076
077  private final Random random;
078
079  // A timestamp of the last time the list of replication peers changed
080  private long lastUpdateToPeers;
081
082  // The current pool of sinks to which replication can be performed
083  private List<ServerName> sinks = Lists.newArrayList();
084
085  /**
086   * Instantiate for a single replication peer cluster.
087   * @param conn connection to the peer cluster
088   * @param peerClusterId identifier of the peer cluster
089   * @param endpoint replication endpoint for inter cluster replication
090   * @param conf HBase configuration, used for determining replication source ratio and bad peer
091   *          threshold
092   */
093  public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
094      HBaseReplicationEndpoint endpoint, Configuration conf) {
095    this.conn = conn;
096    this.peerClusterId = peerClusterId;
097    this.endpoint = endpoint;
098    this.badReportCounts = Maps.newHashMap();
099    this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
100    this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
101                                        DEFAULT_BAD_SINK_THRESHOLD);
102    this.random = new Random();
103  }
104
105  /**
106   * Get a randomly-chosen replication sink to replicate to.
107   *
108   * @return a replication sink to replicate to
109   */
110  public synchronized SinkPeer getReplicationSink() throws IOException {
111    if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
112      LOG.info("Current list of sinks is out of date or empty, updating");
113      chooseSinks();
114    }
115
116    if (sinks.isEmpty()) {
117      throw new IOException("No replication sinks are available");
118    }
119    ServerName serverName = sinks.get(random.nextInt(sinks.size()));
120    return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
121  }
122
123  /**
124   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
125   * failed). If a single SinkPeer is reported as bad more than
126   * replication.bad.sink.threshold times, it will be removed
127   * from the pool of potential replication targets.
128   *
129   * @param sinkPeer
130   *          The SinkPeer that had a failed replication attempt on it
131   */
132  public synchronized void reportBadSink(SinkPeer sinkPeer) {
133    ServerName serverName = sinkPeer.getServerName();
134    int badReportCount = (badReportCounts.containsKey(serverName)
135                    ? badReportCounts.get(serverName) : 0) + 1;
136    badReportCounts.put(serverName, badReportCount);
137    if (badReportCount > badSinkThreshold) {
138      this.sinks.remove(serverName);
139      if (sinks.isEmpty()) {
140        chooseSinks();
141      }
142    }
143  }
144
145  /**
146   * Report that a {@code SinkPeer} successfully replicated a chunk of data.
147   *
148   * @param sinkPeer
149   *          The SinkPeer that had a failed replication attempt on it
150   */
151  public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
152    badReportCounts.remove(sinkPeer.getServerName());
153  }
154
155  /**
156   * Refresh the list of sinks.
157   */
158  public synchronized void chooseSinks() {
159    List<ServerName> slaveAddresses = endpoint.getRegionServers();
160    if(slaveAddresses.isEmpty()){
161      LOG.warn("No sinks available at peer. Will not be able to replicate");
162    }
163    Collections.shuffle(slaveAddresses, random);
164    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
165    sinks = slaveAddresses.subList(0, numSinks);
166    lastUpdateToPeers = System.currentTimeMillis();
167    badReportCounts.clear();
168  }
169
170  public synchronized int getNumSinks() {
171    return sinks.size();
172  }
173
174  protected List<ServerName> getSinksForTesting() {
175    return Collections.unmodifiableList(sinks);
176  }
177
178  /**
179   * Wraps a replication region server sink to provide the ability to identify
180   * it.
181   */
182  public static class SinkPeer {
183    private ServerName serverName;
184    private AdminService.BlockingInterface regionServer;
185
186    public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
187      this.serverName = serverName;
188      this.regionServer = regionServer;
189    }
190
191    ServerName getServerName() {
192      return serverName;
193    }
194
195    public AdminService.BlockingInterface getRegionServer() {
196      return regionServer;
197    }
198
199  }
200
201}