001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024import java.util.Random;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.client.ClusterConnection;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
036import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
039
040/**
041 * Maintains a collection of peers to replicate to, and randomly selects a single peer to replicate
042 * to per set of data to replicate. Also handles keeping track of peer availability.
043 */
044@InterfaceAudience.Private
045public class ReplicationSinkManager {
046
047  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkManager.class);
048
049  /**
050   * Default maximum number of times a replication sink can be reported as bad before it will no
051   * longer be provided as a sink for replication without the pool of replication sinks being
052   * refreshed.
053   */
054  static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
055
056  /**
057   * Default ratio of the total number of peer cluster region servers to consider replicating to.
058   */
059  static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
060
061  private final Connection conn;
062
063  private final String peerClusterId;
064
065  private final HBaseReplicationEndpoint endpoint;
066
067  // Count of "bad replication sink" reports per peer sink
068  private final Map<ServerName, Integer> badReportCounts;
069
070  // Ratio of total number of potential peer region servers to be used
071  private final float ratio;
072
073  // Maximum number of times a sink can be reported as bad before the pool of
074  // replication sinks is refreshed
075  private final int badSinkThreshold;
076
077  private final Random random;
078
079  // A timestamp of the last time the list of replication peers changed
080  private long lastUpdateToPeers;
081
082  // The current pool of sinks to which replication can be performed
083  private List<ServerName> sinks = Lists.newArrayList();
084
085  /**
086   * Instantiate for a single replication peer cluster.
087   * @param conn          connection to the peer cluster
088   * @param peerClusterId identifier of the peer cluster
089   * @param endpoint      replication endpoint for inter cluster replication
090   * @param conf          HBase configuration, used for determining replication source ratio and bad
091   *                      peer threshold
092   */
093  public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
094    HBaseReplicationEndpoint endpoint, Configuration conf) {
095    this.conn = conn;
096    this.peerClusterId = peerClusterId;
097    this.endpoint = endpoint;
098    this.badReportCounts = Maps.newHashMap();
099    this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
100    this.badSinkThreshold =
101      conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
102    this.random = new Random();
103  }
104
105  /**
106   * Get a randomly-chosen replication sink to replicate to.
107   * @return a replication sink to replicate to
108   */
109  public synchronized SinkPeer getReplicationSink() throws IOException {
110    if (endpoint.getLastRegionServerUpdate() > this.lastUpdateToPeers || sinks.isEmpty()) {
111      LOG.info("Current list of sinks is out of date or empty, updating");
112      chooseSinks();
113    }
114
115    if (sinks.isEmpty()) {
116      throw new IOException("No replication sinks are available");
117    }
118    ServerName serverName = sinks.get(random.nextInt(sinks.size()));
119    return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
120  }
121
122  /**
123   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single
124   * SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed
125   * from the pool of potential replication targets. n * The SinkPeer that had a failed replication
126   * attempt on it
127   */
128  public synchronized void reportBadSink(SinkPeer sinkPeer) {
129    ServerName serverName = sinkPeer.getServerName();
130    int badReportCount =
131      (badReportCounts.containsKey(serverName) ? badReportCounts.get(serverName) : 0) + 1;
132    badReportCounts.put(serverName, badReportCount);
133    if (badReportCount > badSinkThreshold) {
134      this.sinks.remove(serverName);
135      if (sinks.isEmpty()) {
136        chooseSinks();
137      }
138    }
139  }
140
141  /**
142   * Report that a {@code SinkPeer} successfully replicated a chunk of data. n * The SinkPeer that
143   * had a failed replication attempt on it
144   */
145  public synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
146    badReportCounts.remove(sinkPeer.getServerName());
147  }
148
149  /**
150   * Refresh the list of sinks.
151   */
152  public synchronized void chooseSinks() {
153    List<ServerName> slaveAddresses = endpoint.getRegionServers();
154    if (slaveAddresses.isEmpty()) {
155      LOG.warn("No sinks available at peer. Will not be able to replicate");
156    }
157    Collections.shuffle(slaveAddresses, random);
158    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
159    sinks = slaveAddresses.subList(0, numSinks);
160    lastUpdateToPeers = EnvironmentEdgeManager.currentTime();
161    badReportCounts.clear();
162  }
163
164  public synchronized int getNumSinks() {
165    return sinks.size();
166  }
167
168  protected List<ServerName> getSinksForTesting() {
169    return Collections.unmodifiableList(sinks);
170  }
171
172  /**
173   * Wraps a replication region server sink to provide the ability to identify it.
174   */
175  public static class SinkPeer {
176    private ServerName serverName;
177    private AdminService.BlockingInterface regionServer;
178
179    public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
180      this.serverName = serverName;
181      this.regionServer = regionServer;
182    }
183
184    ServerName getServerName() {
185      return serverName;
186    }
187
188    public AdminService.BlockingInterface getRegionServer() {
189      return regionServer;
190    }
191
192  }
193
194}