001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025import java.util.UUID;
026import java.util.concurrent.ThreadLocalRandom;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Abortable;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.client.AsyncClusterConnection;
032import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
033import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
034import org.apache.hadoop.hbase.security.User;
035import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
036import org.apache.hadoop.hbase.zookeeper.ZKListener;
037import org.apache.hadoop.hbase.zookeeper.ZKUtil;
038import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.zookeeper.KeeperException;
041import org.apache.zookeeper.KeeperException.AuthFailedException;
042import org.apache.zookeeper.KeeperException.ConnectionLossException;
043import org.apache.zookeeper.KeeperException.SessionExpiredException;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
048
049/**
050 * A {@link BaseReplicationEndpoint} for replication endpoints whose target cluster is an HBase
051 * cluster.
052 */
053@InterfaceAudience.Private
054public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
055  implements Abortable {
056
057  private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
058
059  private ZKWatcher zkw = null;
060  private final Object zkwLock = new Object();
061
062  protected Configuration conf;
063
064  private AsyncClusterConnection conn;
065
066  /**
067   * Default maximum number of times a replication sink can be reported as bad before it will no
068   * longer be provided as a sink for replication without the pool of replication sinks being
069   * refreshed.
070   */
071  public static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
072
073  /**
074   * Default ratio of the total number of peer cluster region servers to consider replicating to.
075   */
076  public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
077
078  // Ratio of total number of potential peer region servers to be used
079  private float ratio;
080
081  // Maximum number of times a sink can be reported as bad before the pool of
082  // replication sinks is refreshed
083  private int badSinkThreshold;
084  // Count of "bad replication sink" reports per peer sink
085  private Map<ServerName, Integer> badReportCounts;
086
087  private List<ServerName> sinkServers = new ArrayList<>(0);
088
089  /*
090   * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
091   * Connection implementations, or initialize it in a different way, so defining createConnection
092   * as protected for possible overridings.
093   */
094  protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
095    return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
096  }
097
098  @Override
099  public void init(Context context) throws IOException {
100    super.init(context);
101    this.conf = HBaseConfiguration.create(ctx.getConfiguration());
102    this.ratio =
103      ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
104    this.badSinkThreshold =
105      ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
106    this.badReportCounts = Maps.newHashMap();
107  }
108
109  protected void disconnect() {
110    synchronized (zkwLock) {
111      if (zkw != null) {
112        zkw.close();
113      }
114    }
115    if (this.conn != null) {
116      try {
117        this.conn.close();
118        this.conn = null;
119      } catch (IOException e) {
120        LOG.warn("{} Failed to close the connection", ctx.getPeerId());
121      }
122    }
123  }
124
125  /**
126   * A private method used to re-establish a zookeeper session with a peer cluster.
127   */
128  private void reconnect(KeeperException ke) {
129    if (
130      ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
131        || ke instanceof AuthFailedException
132    ) {
133      String clusterKey = ctx.getPeerConfig().getClusterKey();
134      LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke);
135      try {
136        reloadZkWatcher();
137      } catch (IOException io) {
138        LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io);
139      }
140    }
141  }
142
143  @Override
144  public void start() {
145    startAsync();
146  }
147
148  @Override
149  public void stop() {
150    stopAsync();
151  }
152
153  @Override
154  protected void doStart() {
155    try {
156      reloadZkWatcher();
157      connectPeerCluster();
158      notifyStarted();
159    } catch (IOException e) {
160      notifyFailed(e);
161    }
162  }
163
164  @Override
165  protected void doStop() {
166    disconnect();
167    notifyStopped();
168  }
169
170  @Override
171  // Synchronize peer cluster connection attempts to avoid races and rate
172  // limit connections when multiple replication sources try to connect to
173  // the peer cluster. If the peer cluster is down we can get out of control
174  // over time.
175  public UUID getPeerUUID() {
176    UUID peerUUID = null;
177    try {
178      synchronized (zkwLock) {
179        peerUUID = ZKClusterId.getUUIDForCluster(zkw);
180      }
181    } catch (KeeperException ke) {
182      reconnect(ke);
183    }
184    return peerUUID;
185  }
186
187  /**
188   * Closes the current ZKW (if not null) and creates a new one
189   * @throws IOException If anything goes wrong connecting
190   */
191  private void reloadZkWatcher() throws IOException {
192    synchronized (zkwLock) {
193      if (zkw != null) {
194        zkw.close();
195      }
196      zkw =
197        new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this);
198      zkw.registerListener(new PeerRegionServerListener(this));
199    }
200  }
201
202  private void connectPeerCluster() throws IOException {
203    try {
204      conn = createConnection(this.conf);
205    } catch (IOException ioe) {
206      LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe);
207      throw ioe;
208    }
209  }
210
211  @Override
212  public void abort(String why, Throwable e) {
213    LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
214      + " was aborted for the following reason(s):" + why, e);
215  }
216
217  @Override
218  public boolean isAborted() {
219    // Currently this is never "Aborted", we just log when the abort method is called.
220    return false;
221  }
222
223  /**
224   * Get the list of all the region servers from the specified peer
225   * @return list of region server addresses or an empty list if the slave is unavailable
226   */
227  protected List<ServerName> fetchSlavesAddresses() {
228    List<String> children = null;
229    try {
230      synchronized (zkwLock) {
231        children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
232      }
233    } catch (KeeperException ke) {
234      if (LOG.isDebugEnabled()) {
235        LOG.debug("Fetch slaves addresses failed", ke);
236      }
237      reconnect(ke);
238    }
239    if (children == null) {
240      return Collections.emptyList();
241    }
242    List<ServerName> addresses = new ArrayList<>(children.size());
243    for (String child : children) {
244      addresses.add(ServerName.parseServerName(child));
245    }
246    return addresses;
247  }
248
249  protected synchronized void chooseSinks() {
250    List<ServerName> slaveAddresses = fetchSlavesAddresses();
251    if (slaveAddresses.isEmpty()) {
252      LOG.warn("No sinks available at peer. Will not be able to replicate");
253    }
254    Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
255    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
256    this.sinkServers = slaveAddresses.subList(0, numSinks);
257    badReportCounts.clear();
258  }
259
260  protected synchronized int getNumSinks() {
261    return sinkServers.size();
262  }
263
264  /**
265   * Get a randomly-chosen replication sink to replicate to.
266   * @return a replication sink to replicate to
267   */
268  protected synchronized SinkPeer getReplicationSink() throws IOException {
269    if (sinkServers.isEmpty()) {
270      LOG.info("Current list of sinks is out of date or empty, updating");
271      chooseSinks();
272    }
273    if (sinkServers.isEmpty()) {
274      throw new IOException("No replication sinks are available");
275    }
276    ServerName serverName =
277      sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
278    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
279  }
280
281  /**
282   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it failed). If a single
283   * SinkPeer is reported as bad more than replication.bad.sink.threshold times, it will be removed
284   * from the pool of potential replication targets.
285   * @param sinkPeer The SinkPeer that had a failed replication attempt on it
286   */
287  protected synchronized void reportBadSink(SinkPeer sinkPeer) {
288    ServerName serverName = sinkPeer.getServerName();
289    int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1);
290    if (badReportCount > badSinkThreshold) {
291      this.sinkServers.remove(serverName);
292      if (sinkServers.isEmpty()) {
293        chooseSinks();
294      }
295    }
296  }
297
298  /**
299   * Report that a {@code SinkPeer} successfully replicated a chunk of data. The SinkPeer that had a
300   * failed replication attempt on it
301   */
302  protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
303    badReportCounts.remove(sinkPeer.getServerName());
304  }
305
306  List<ServerName> getSinkServers() {
307    return sinkServers;
308  }
309
310  /**
311   * Tracks changes to the list of region servers in a peer's cluster.
312   */
313  public static class PeerRegionServerListener extends ZKListener {
314
315    private final HBaseReplicationEndpoint replicationEndpoint;
316    private final String regionServerListNode;
317
318    public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
319      super(endpoint.zkw);
320      this.replicationEndpoint = endpoint;
321      this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
322    }
323
324    @Override
325    public synchronized void nodeChildrenChanged(String path) {
326      if (path.equals(regionServerListNode)) {
327        LOG.info("Detected change to peer region servers, fetching updated list");
328        replicationEndpoint.chooseSinks();
329      }
330    }
331  }
332
333  /**
334   * Wraps a replication region server sink to provide the ability to identify it.
335   */
336  public static class SinkPeer {
337    private ServerName serverName;
338    private AsyncRegionServerAdmin regionServer;
339
340    public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
341      this.serverName = serverName;
342      this.regionServer = regionServer;
343    }
344
345    ServerName getServerName() {
346      return serverName;
347    }
348
349    public AsyncRegionServerAdmin getRegionServer() {
350      return regionServer;
351    }
352  }
353}