001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.net.URI;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.EnumSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Optional;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.Semaphore;
032import java.util.regex.Pattern;
033import java.util.stream.Collectors;
034import org.apache.commons.lang3.StringUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
043import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
044import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
045import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
046import org.apache.hadoop.hbase.replication.ReplicationException;
047import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
050import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
051import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
052import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
053import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
054import org.apache.hadoop.hbase.replication.ReplicationUtils;
055import org.apache.hadoop.hbase.replication.SyncReplicationState;
056import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
057import org.apache.hadoop.hbase.zookeeper.ZKConfig;
058import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.apache.zookeeper.KeeperException;
061import org.slf4j.Logger;
062import org.slf4j.LoggerFactory;
063
064import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
065import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
066
067/**
068 * Manages and performs all replication admin operations.
069 * <p>
070 * Used to add/remove a replication peer.
071 */
072@InterfaceAudience.Private
073public class ReplicationPeerManager {
074
075  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);
076
077  private final ReplicationPeerStorage peerStorage;
078
079  private final ReplicationQueueStorage queueStorage;
080
081  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
082
083  private final ImmutableMap<SyncReplicationState,
084    EnumSet<SyncReplicationState>> allowedTransition =
085      Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
086        EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY),
087        SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE),
088        SyncReplicationState.DOWNGRADE_ACTIVE,
089        EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
090
091  // Only allow to add one sync replication peer concurrently
092  private final Semaphore syncReplicationPeerLock = new Semaphore(1);
093
094  private final String clusterId;
095
096  private final Configuration conf;
097
098  ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
099    ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
100    this.peerStorage = peerStorage;
101    this.queueStorage = queueStorage;
102    this.peers = peers;
103    this.conf = conf;
104    this.clusterId = clusterId;
105  }
106
107  private void checkQueuesDeleted(String peerId)
108    throws ReplicationException, DoNotRetryIOException {
109    for (ServerName replicator : queueStorage.getListOfReplicators()) {
110      List<String> queueIds = queueStorage.getAllQueues(replicator);
111      for (String queueId : queueIds) {
112        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
113        if (queueInfo.getPeerId().equals(peerId)) {
114          throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + ", replicator: "
115            + replicator + ", queueId: " + queueId);
116        }
117      }
118    }
119    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
120      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
121    }
122  }
123
124  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
125    throws DoNotRetryIOException, ReplicationException {
126    if (peerId.contains("-")) {
127      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
128    }
129    checkPeerConfig(peerConfig);
130    if (peerConfig.isSyncReplication()) {
131      checkSyncReplicationPeerConfigConflict(peerConfig);
132    }
133    if (peers.containsKey(peerId)) {
134      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
135    }
136    // make sure that there is no queues with the same peer id. This may happen when we create a
137    // peer with the same id with a old deleted peer. If the replication queues for the old peer
138    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
139    // file may also be replicated.
140    checkQueuesDeleted(peerId);
141  }
142
143  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
144    ReplicationPeerDescription desc = peers.get(peerId);
145    if (desc == null) {
146      throw new ReplicationPeerNotFoundException(peerId);
147    }
148    return desc;
149  }
150
151  private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException {
152    ReplicationPeerDescription desc = peers.get(peerId);
153    if (
154      desc != null && desc.getPeerConfig().isSyncReplication()
155        && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState())
156    ) {
157      throw new DoNotRetryIOException(
158        "Couldn't remove synchronous replication peer with state=" + desc.getSyncReplicationState()
159          + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly.");
160    }
161  }
162
163  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
164    ReplicationPeerDescription pd = checkPeerExists(peerId);
165    checkPeerInDAStateIfSyncReplication(peerId);
166    return pd.getPeerConfig();
167  }
168
169  void preEnablePeer(String peerId) throws DoNotRetryIOException {
170    ReplicationPeerDescription desc = checkPeerExists(peerId);
171    if (desc.isEnabled()) {
172      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
173    }
174  }
175
176  void preDisablePeer(String peerId) throws DoNotRetryIOException {
177    ReplicationPeerDescription desc = checkPeerExists(peerId);
178    if (!desc.isEnabled()) {
179      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
180    }
181  }
182
183  /**
184   * Return the old peer description. Can never be null.
185   */
186  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
187    throws DoNotRetryIOException {
188    checkPeerConfig(peerConfig);
189    ReplicationPeerDescription desc = checkPeerExists(peerId);
190    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
191    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
192      throw new DoNotRetryIOException(
193        "Changing the cluster key on an existing peer is not allowed. Existing key '"
194          + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '"
195          + peerConfig.getClusterKey() + "'");
196    }
197
198    if (
199      !isStringEquals(peerConfig.getReplicationEndpointImpl(),
200        oldPeerConfig.getReplicationEndpointImpl())
201    ) {
202      throw new DoNotRetryIOException("Changing the replication endpoint implementation class "
203        + "on an existing peer is not allowed. Existing class '"
204        + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId
205        + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
206    }
207
208    if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) {
209      throw new DoNotRetryIOException(
210        "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal "
211          + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId
212          + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
213    }
214
215    if (oldPeerConfig.isSyncReplication()) {
216      if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) {
217        throw new DoNotRetryIOException(
218          "Changing the replicated namespace/table config on a synchronous replication "
219            + "peer(peerId: " + peerId + ") is not allowed.");
220      }
221    }
222    return desc;
223  }
224
225  /** Returns the old desciption of the peer */
226  ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId,
227    SyncReplicationState state) throws DoNotRetryIOException {
228    ReplicationPeerDescription desc = checkPeerExists(peerId);
229    SyncReplicationState fromState = desc.getSyncReplicationState();
230    EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
231    if (allowedToStates == null || !allowedToStates.contains(state)) {
232      throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState
233        + " to " + state + " for peer id=" + peerId);
234    }
235    return desc;
236  }
237
238  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
239    throws ReplicationException {
240    if (peers.containsKey(peerId)) {
241      // this should be a retry, just return
242      return;
243    }
244    peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
245    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
246    SyncReplicationState syncReplicationState = copiedPeerConfig.isSyncReplication()
247      ? SyncReplicationState.DOWNGRADE_ACTIVE
248      : SyncReplicationState.NONE;
249    peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
250    peers.put(peerId,
251      new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
252  }
253
254  public void removePeer(String peerId) throws ReplicationException {
255    if (!peers.containsKey(peerId)) {
256      // this should be a retry, just return
257      return;
258    }
259    peerStorage.removePeer(peerId);
260    peers.remove(peerId);
261  }
262
263  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
264    ReplicationPeerDescription desc = peers.get(peerId);
265    if (desc.isEnabled() == enabled) {
266      // this should be a retry, just return
267      return;
268    }
269    peerStorage.setPeerState(peerId, enabled);
270    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(),
271      desc.getSyncReplicationState()));
272  }
273
274  public boolean getPeerState(String peerId) throws ReplicationException {
275    ReplicationPeerDescription desc = peers.get(peerId);
276    if (desc != null) {
277      return desc.isEnabled();
278    } else {
279      throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
280    }
281  }
282
283  public void enablePeer(String peerId) throws ReplicationException {
284    setPeerState(peerId, true);
285  }
286
287  public void disablePeer(String peerId) throws ReplicationException {
288    setPeerState(peerId, false);
289  }
290
291  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
292    throws ReplicationException {
293    // the checking rules are too complicated here so we give up checking whether this is a retry.
294    ReplicationPeerDescription desc = peers.get(peerId);
295    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
296    ReplicationPeerConfigBuilder newPeerConfigBuilder =
297      ReplicationPeerConfig.newBuilder(peerConfig);
298    // we need to use the new conf to overwrite the old one.
299    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
300    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
301    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
302    peerStorage.updatePeerConfig(peerId, newPeerConfig);
303    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
304      desc.getSyncReplicationState()));
305  }
306
307  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
308    if (pattern == null) {
309      return new ArrayList<>(peers.values());
310    }
311    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
312      .collect(Collectors.toList());
313  }
314
315  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
316    ReplicationPeerDescription desc = peers.get(peerId);
317    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
318  }
319
320  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
321    queueStorage.removeLastSequenceIds(peerId);
322  }
323
324  public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
325    throws ReplicationException {
326    peerStorage.setPeerNewSyncReplicationState(peerId, state);
327  }
328
329  public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState)
330    throws ReplicationException {
331    if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) {
332      // Only transit if this is not a retry
333      peerStorage.transitPeerSyncReplicationState(peerId);
334    }
335    ReplicationPeerDescription desc = peers.get(peerId);
336    if (desc.getSyncReplicationState() != newState) {
337      // Only recreate the desc if this is not a retry
338      peers.put(peerId,
339        new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState));
340    }
341  }
342
343  public void removeAllQueues(String peerId) throws ReplicationException {
344    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
345    // on-going when the refresh peer config procedure is done, if a RS which has already been
346    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
347    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
348    // source, then the queue will leave there until the another RS detects the crash and helps
349    // removing the queue.
350    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
351    // claiming, it will either under the old RS or under the new RS, and a queue can only be
352    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
353    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
354    // unless it has already been removed by others.
355    ReplicationUtils.removeAllQueues(queueStorage, peerId);
356    ReplicationUtils.removeAllQueues(queueStorage, peerId);
357  }
358
359  public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
360    removeAllQueues(peerId);
361    queueStorage.removePeerFromHFileRefs(peerId);
362  }
363
364  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
365    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
366    ReplicationEndpoint endpoint = null;
367    if (!StringUtils.isBlank(replicationEndpointImpl)) {
368      try {
369        // try creating a instance
370        endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class)
371          .getDeclaredConstructor().newInstance();
372      } catch (Throwable e) {
373        throw new DoNotRetryIOException(
374          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
375          e);
376      }
377    }
378    // Endpoints implementing HBaseReplicationEndpoint need to check cluster key
379    if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) {
380      checkClusterKey(peerConfig.getClusterKey());
381      // Check if endpoint can replicate to the same cluster
382      if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
383        checkSameClusterKey(peerConfig.getClusterKey());
384      }
385    }
386
387    if (peerConfig.replicateAllUserTables()) {
388      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
389      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
390      // cluster.
391      if (
392        (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
393          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())
394      ) {
395        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
396          + "when you want replicate all cluster");
397      }
398      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
399        peerConfig.getExcludeTableCFsMap());
400    } else {
401      // If replicate_all flag is false, it means all user tables can't be replicated to peer
402      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
403      // cluster.
404      if (
405        (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty())
406          || (peerConfig.getExcludeTableCFsMap() != null
407            && !peerConfig.getExcludeTableCFsMap().isEmpty())
408      ) {
409        throw new DoNotRetryIOException(
410          "Need clean exclude-namespaces or exclude-table-cfs config firstly"
411            + " when replicate_all flag is false");
412      }
413      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
414        peerConfig.getTableCFsMap());
415    }
416
417    if (peerConfig.isSyncReplication()) {
418      checkPeerConfigForSyncReplication(peerConfig);
419    }
420
421    checkConfiguredWALEntryFilters(peerConfig);
422  }
423
424  private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig)
425    throws DoNotRetryIOException {
426    // This is used to reduce the difficulty for implementing the sync replication state transition
427    // as we need to reopen all the related regions.
428    // TODO: Add namespace, replicat_all flag back
429    if (peerConfig.replicateAllUserTables()) {
430      throw new DoNotRetryIOException(
431        "Only support replicated table config for sync replication peer");
432    }
433    if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
434      throw new DoNotRetryIOException(
435        "Only support replicated table config for sync replication peer");
436    }
437    if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
438      throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
439    }
440    for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
441      if (cfs != null && !cfs.isEmpty()) {
442        throw new DoNotRetryIOException(
443          "Only support replicated table config for sync replication peer");
444      }
445    }
446
447    Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
448    if (!remoteWALDir.isAbsolute()) {
449      throw new DoNotRetryIOException(
450        "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
451    }
452    URI remoteWALDirUri = remoteWALDir.toUri();
453    if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
454      throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir()
455        + " is not qualified, you must provide scheme and authority");
456    }
457  }
458
459  private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
460    throws DoNotRetryIOException {
461    for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
462      for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
463        ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
464        if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
465          throw new DoNotRetryIOException(
466            "Table " + tableName + " has been replicated by peer " + entry.getKey());
467        }
468      }
469    }
470  }
471
472  /**
473   * Set a namespace in the peer config means that all tables in this namespace will be replicated
474   * to the peer cluster.
475   * <ol>
476   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
477   * the peer config.</li>
478   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
479   * config.</li>
480   * </ol>
481   * <p>
482   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
483   * replicated to the peer cluster.
484   * <ol>
485   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
486   * this namespace to the peer config.</li>
487   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
488   * exclude namespace.</li>
489   * </ol>
490   */
491  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
492    Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
493    if (namespaces == null || namespaces.isEmpty()) {
494      return;
495    }
496    if (tableCfs == null || tableCfs.isEmpty()) {
497      return;
498    }
499    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
500      TableName table = entry.getKey();
501      if (namespaces.contains(table.getNamespaceAsString())) {
502        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces "
503          + table.getNamespaceAsString() + " in peer config");
504      }
505    }
506  }
507
508  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
509    throws DoNotRetryIOException {
510    String filterCSV = peerConfig.getConfiguration()
511      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
512    if (filterCSV != null && !filterCSV.isEmpty()) {
513      String[] filters = filterCSV.split(",");
514      for (String filter : filters) {
515        try {
516          Class.forName(filter).getDeclaredConstructor().newInstance();
517        } catch (Exception e) {
518          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter
519            + " could not be created. Failing add/update peer operation.", e);
520        }
521      }
522    }
523  }
524
525  private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
526    try {
527      ZKConfig.validateClusterKey(clusterKey);
528    } catch (IOException e) {
529      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
530    }
531  }
532
533  private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException {
534    String peerClusterId = "";
535    try {
536      // Create the peer cluster config for get peer cluster id
537      Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
538      try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
539        peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
540      }
541    } catch (IOException | KeeperException e) {
542      throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
543    }
544    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
545    // peerClusterId value, which is the same as the source clusterId
546    if (clusterId.equals(peerClusterId)) {
547      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
548        + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
549    }
550  }
551
552  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
553    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
554      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
555      .collect(Collectors.toList());
556  }
557
558  public ReplicationQueueStorage getQueueStorage() {
559    return queueStorage;
560  }
561
562  public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
563    throws ReplicationException {
564    ReplicationPeerStorage peerStorage =
565      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
566    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
567    for (String peerId : peerStorage.listPeerIds()) {
568      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
569      if (
570        ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
571          .equals(peerConfig.getReplicationEndpointImpl())
572      ) {
573        // we do not use this endpoint for region replication any more, see HBASE-26233
574        LOG.info("Legacy region replication peer found, removing: {}", peerConfig);
575        peerStorage.removePeer(peerId);
576        continue;
577      }
578      peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
579      peerStorage.updatePeerConfig(peerId, peerConfig);
580      boolean enabled = peerStorage.isPeerEnabled(peerId);
581      SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
582      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
583    }
584    return new ReplicationPeerManager(peerStorage,
585      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
586  }
587
588  /**
589   * For replication peer cluster key or endpoint class, null and empty string is same. So here
590   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
591   */
592  private boolean isStringEquals(String s1, String s2) {
593    if (StringUtils.isBlank(s1)) {
594      return StringUtils.isBlank(s2);
595    }
596    return s1.equals(s2);
597  }
598
599  public boolean tryAcquireSyncReplicationPeerLock() {
600    return syncReplicationPeerLock.tryAcquire();
601  }
602
603  public void releaseSyncReplicationPeerLock() {
604    syncReplicationPeerLock.release();
605  }
606}