001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.EnumSet;
025import java.util.List;
026import java.util.Map;
027import java.util.UUID;
028import java.util.concurrent.ThreadLocalRandom;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.Abortable;
031import org.apache.hadoop.hbase.ClusterMetrics;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.client.AsyncClusterConnection;
035import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
036import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
037import org.apache.hadoop.hbase.security.User;
038import org.apache.hadoop.hbase.util.FutureUtils;
039import org.apache.hadoop.hbase.util.ReservoirSample;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
045
046/**
047 * A {@link BaseReplicationEndpoint} for replication endpoints whose target cluster is an HBase
048 * cluster.
049 */
050@InterfaceAudience.Private
051public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
052  implements Abortable {
053
054  private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
055
056  protected Configuration conf;
057
058  private final Object connLock = new Object();
059
060  private volatile AsyncClusterConnection conn;
061
062  /**
063   * Default maximum number of times a replication sink can be reported as bad before it will no
064   * longer be provided as a sink for replication without the pool of replication sinks being
065   * refreshed.
066   */
067  public static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
068
069  /**
070   * Default ratio of the total number of peer cluster region servers to consider replicating to.
071   */
072  public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
073
074  // Ratio of total number of potential peer region servers to be used
075  private float ratio;
076
077  // Maximum number of times a sink can be reported as bad before the pool of
078  // replication sinks is refreshed
079  private int badSinkThreshold;
080  // Count of "bad replication sink" reports per peer sink
081  private Map<ServerName, Integer> badReportCounts;
082
083  private List<ServerName> sinkServers = new ArrayList<>(0);
084
085  /*
086   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
087   * Connection implementations, or initialize it in a different way, so defining createConnection
088   * as protected for possible overridings.
089   */
090  protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
091    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
092  }
093
094  @Override
095  public void init(Context context) throws IOException {
096    super.init(context);
097    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
098    this.ratio =
099      ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
100    this.badSinkThreshold =
101      ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
102    this.badReportCounts = Maps.newHashMap();
103  }
104
105  private void disconnect() {
106    synchronized (connLock) {
107      if (this.conn != null) {
108        try {
109          this.conn.close();
110          this.conn = null;
111        } catch (IOException e) {
112          LOG.warn("{} Failed to close the connection", ctx.getPeerId());
113        }
114      }
115    }
116  }
117
118  @Override
119  public void start() {
120    startAsync();
121  }
122
123  @Override
124  public void stop() {
125    stopAsync();
126  }
127
128  @Override
129  protected void doStart() {
130    notifyStarted();
131  }
132
133  @Override
134  protected void doStop() {
135    disconnect();
136    notifyStopped();
137  }
138
139  @Override
140  public UUID getPeerUUID() {
141    try {
142      AsyncClusterConnection conn = connect();
143      String clusterId = FutureUtils
144        .get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)))
145        .getClusterId();
146      return UUID.fromString(clusterId);
147    } catch (IOException e) {
148      LOG.warn("Failed to get cluster id for cluster", e);
149      return null;
150    }
151  }
152
153  // do not call this method in doStart method, only initialize the connection to remote cluster
154  // when you actually wants to make use of it. The problem here is that, starting the replication
155  // endpoint is part of the region server initialization work, so if the peer cluster is fully
156  // down and we can not connect to it, we will cause the initialization to fail and crash the
157  // region server, as we need the cluster id while setting up the AsyncClusterConnection, which
158  // needs to at least connect to zookeeper or some other servers in the peer cluster based on
159  // different connection registry implementation
160  private AsyncClusterConnection connect() throws IOException {
161    AsyncClusterConnection c = this.conn;
162    if (c != null) {
163      return c;
164    }
165    synchronized (connLock) {
166      c = this.conn;
167      if (c != null) {
168        return c;
169      }
170      c = createConnection(this.conf);
171      conn = c;
172    }
173    return c;
174  }
175
176  @Override
177  public void abort(String why, Throwable e) {
178    LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
179      + " was aborted for the following reason(s):" + why, e);
180  }
181
182  @Override
183  public boolean isAborted() {
184    // Currently this is never "Aborted", we just log when the abort method is called.
185    return false;
186  }
187
188  /**
189   * Get the list of all the region servers from the specified peer
190   * @return list of region server addresses or an empty list if the slave is unavailable
191   */
192  // will be overrided in tests so protected
193  protected Collection<ServerName> fetchPeerAddresses() {
194    try {
195      return FutureUtils.get(connect().getAdmin().getRegionServers(true));
196    } catch (IOException e) {
197      LOG.debug("Fetch peer addresses failed", e);
198      return Collections.emptyList();
199    }
200  }
201
202  protected synchronized void chooseSinks() {
203    Collection<ServerName> slaveAddresses = fetchPeerAddresses();
204    if (slaveAddresses.isEmpty()) {
205      LOG.warn("No sinks available at peer. Will not be able to replicate");
206      this.sinkServers = Collections.emptyList();
207    } else {
208      int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
209      ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks);
210      sample.add(slaveAddresses.iterator());
211      this.sinkServers = sample.getSamplingResult();
212    }
213    badReportCounts.clear();
214  }
215
216  protected synchronized int getNumSinks() {
217    return sinkServers.size();
218  }
219
220  /**
221   * Get a randomly-chosen replication sink to replicate to.
222   * @return a replication sink to replicate to
223   */
224  protected synchronized SinkPeer getReplicationSink() throws IOException {
225    if (sinkServers.isEmpty()) {
226      LOG.info("Current list of sinks is out of date or empty, updating");
227      chooseSinks();
228    }
229    if (sinkServers.isEmpty()) {
230      throw new IOException("No replication sinks are available");
231    }
232    ServerName serverName =
233      sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
234    return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName));
235  }
236
237  /**
238   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single
239   * SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed
240   * from the pool of potential replication targets.
241   * @param sinkPeer The SinkPeer that had a failed replication attempt on it
242   */
243  protected synchronized void reportBadSink(SinkPeer sinkPeer) {
244    ServerName serverName = sinkPeer.getServerName();
245    int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1);
246    if (badReportCount > badSinkThreshold) {
247      this.sinkServers.remove(serverName);
248      if (sinkServers.isEmpty()) {
249        chooseSinks();
250      }
251    }
252  }
253
254  /**
255   * Report that a {@code SinkPeer} successfully replicated a chunk of data. The SinkPeer that had a
256   * failed replication attempt on it
257   */
258  protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
259    badReportCounts.remove(sinkPeer.getServerName());
260  }
261
262  List<ServerName> getSinkServers() {
263    return sinkServers;
264  }
265
266  /**
267   * Wraps a replication region server sink to provide the ability to identify it.
268   */
269  public static class SinkPeer {
270    private ServerName serverName;
271    private AsyncRegionServerAdmin regionServer;
272
273    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
274      this.serverName = serverName;
275      this.regionServer = regionServer;
276    }
277
278    ServerName getServerName() {
279      return serverName;
280    }
281
282    public AsyncRegionServerAdmin getRegionServer() {
283      return regionServer;
284    }
285  }
286}