001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024import java.util.Random;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.client.ClusterConnection;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
035import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
036import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
039
040/**
041 * Maintains a collection of peers to replicate to, and randomly selects a
042 * single peer to replicate to per set of data to replicate. Also handles
043 * keeping track of peer availability.
044 */
045@InterfaceAudience.Private
046public class ReplicationSinkManager {
047
048  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class);
049
050  /**
051   * Default maximum number of times a replication sink can be reported as bad before
052   * it will no longer be provided as a sink for replication without the pool of
053   * replication sinks being refreshed.
054   */
055  static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
056
057  /**
058   * Default ratio of the total number of peer cluster region servers to consider
059   * replicating to.
060   */
061  static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
062
063
064  private final Connection conn;
065
066  private final String peerClusterId;
067
068  private final HBaseReplicationEndpoint endpoint;
069
070  // Count of "bad replication sink" reports per peer sink
071  private final Map<ServerName, Integer> badReportCounts;
072
073  // Ratio of total number of potential peer region servers to be used
074  private final float ratio;
075
076  // Maximum number of times a sink can be reported as bad before the pool of
077  // replication sinks is refreshed
078  private final int badSinkThreshold;
079
080  private final Random random;
081
082  // A timestamp of the last time the list of replication peers changed
083  private long lastUpdateToPeers;
084
085  // The current pool of sinks to which replication can be performed
086  private List<ServerName> sinks = Lists.newArrayList();
087
088  /**
089   * Instantiate for a single replication peer cluster.
090   * @param conn connection to the peer cluster
091   * @param peerClusterId identifier of the peer cluster
092   * @param endpoint replication endpoint for inter cluster replication
093   * @param conf HBase configuration, used for determining replication source ratio and bad peer
094   *          threshold
095   */
096  public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
097      HBaseReplicationEndpoint endpoint, Configuration conf) {
098    this.conn = conn;
099    this.peerClusterId = peerClusterId;
100    this.endpoint = endpoint;
101    this.badReportCounts = Maps.newHashMap();
102    this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
103    this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
104                                        DEFAULT_BAD_SINK_THRESHOLD);
105    this.random = new Random();
106  }
107
108  /**
109   * Get a randomly-chosen replication sink to replicate to.
110   *
111   * @return a replication sink to replicate to
112   */
113  public synchronized SinkPeer getReplicationSink() throws IOException {
114    if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
115      LOG.info("Current list of sinks is out of date or empty, updating");
116      chooseSinks();
117    }
118
119    if (sinks.isEmpty()) {
120      throw new IOException("No replication sinks are available");
121    }
122    ServerName serverName = sinks.get(random.nextInt(sinks.size()));
123    return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
124  }
125
126  /**
127   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
128   * failed). If a single SinkPeer is reported as bad more than
129   * replication.bad.sink.threshold times, it will be removed
130   * from the pool of potential replication targets.
131   *
132   * @param sinkPeer
133   *          The SinkPeer that had a failed replication attempt on it
134   */
135  public synchronized void reportBadSink(SinkPeer sinkPeer) {
136    ServerName serverName = sinkPeer.getServerName();
137    int badReportCount = (badReportCounts.containsKey(serverName)
138                    ? badReportCounts.get(serverName) : 0) + 1;
139    badReportCounts.put(serverName, badReportCount);
140    if (badReportCount > badSinkThreshold) {
141      this.sinks.remove(serverName);
142      if (sinks.isEmpty()) {
143        chooseSinks();
144      }
145    }
146  }
147
148  /**
149   * Report that a {@code SinkPeer} successfully replicated a chunk of data.
150   *
151   * @param sinkPeer
152   *          The SinkPeer that had a failed replication attempt on it
153   */
154  public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
155    badReportCounts.remove(sinkPeer.getServerName());
156  }
157
158  /**
159   * Refresh the list of sinks.
160   */
161  public synchronized void chooseSinks() {
162    List<ServerName> slaveAddresses = endpoint.getRegionServers();
163    Collections.shuffle(slaveAddresses, random);
164    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
165    sinks = slaveAddresses.subList(0, numSinks);
166    lastUpdateToPeers = System.currentTimeMillis();
167    badReportCounts.clear();
168  }
169
170  public synchronized int getNumSinks() {
171    return sinks.size();
172  }
173
174  @VisibleForTesting
175  protected List<ServerName> getSinksForTesting() {
176    return Collections.unmodifiableList(sinks);
177  }
178
179  /**
180   * Wraps a replication region server sink to provide the ability to identify
181   * it.
182   */
183  public static class SinkPeer {
184    private ServerName serverName;
185    private AdminService.BlockingInterface regionServer;
186
187    public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
188      this.serverName = serverName;
189      this.regionServer = regionServer;
190    }
191
192    ServerName getServerName() {
193      return serverName;
194    }
195
196    public AdminService.BlockingInterface getRegionServer() {
197      return regionServer;
198    }
199
200  }
201
202}