001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.net.URI;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.EnumSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Optional;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.Semaphore;
032import java.util.regex.Pattern;
033import java.util.stream.Collectors;
034import org.apache.commons.lang3.StringUtils;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
041import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
042import org.apache.hadoop.hbase.replication.ReplicationException;
043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
044import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
045import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
046import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
047import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
050import org.apache.hadoop.hbase.replication.ReplicationUtils;
051import org.apache.hadoop.hbase.replication.SyncReplicationState;
052import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
053import org.apache.hadoop.hbase.zookeeper.ZKConfig;
054import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
055import org.apache.yetus.audience.InterfaceAudience;
056
057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
058import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
059
060/**
061 * Manages and performs all replication admin operations.
062 * <p>
063 * Used to add/remove a replication peer.
064 */
065@InterfaceAudience.Private
066public class ReplicationPeerManager {
067
068  private final ReplicationPeerStorage peerStorage;
069
070  private final ReplicationQueueStorage queueStorage;
071
072  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
073
074  private final ImmutableMap<SyncReplicationState, EnumSet<SyncReplicationState>>
075    allowedTransition = Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
076      EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY),
077      SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE),
078      SyncReplicationState.DOWNGRADE_ACTIVE,
079      EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
080
081  // Only allow to add one sync replication peer concurrently
082  private final Semaphore syncReplicationPeerLock = new Semaphore(1);
083
084  ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
085      ConcurrentMap<String, ReplicationPeerDescription> peers) {
086    this.peerStorage = peerStorage;
087    this.queueStorage = queueStorage;
088    this.peers = peers;
089  }
090
091  private void checkQueuesDeleted(String peerId)
092      throws ReplicationException, DoNotRetryIOException {
093    for (ServerName replicator : queueStorage.getListOfReplicators()) {
094      List<String> queueIds = queueStorage.getAllQueues(replicator);
095      for (String queueId : queueIds) {
096        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
097        if (queueInfo.getPeerId().equals(peerId)) {
098          throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId +
099            ", replicator: " + replicator + ", queueId: " + queueId);
100        }
101      }
102    }
103    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
104      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
105    }
106  }
107
108  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
109      throws DoNotRetryIOException, ReplicationException {
110    if (peerId.contains("-")) {
111      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
112    }
113    checkPeerConfig(peerConfig);
114    if (peerConfig.isSyncReplication()) {
115      checkSyncReplicationPeerConfigConflict(peerConfig);
116    }
117    if (peers.containsKey(peerId)) {
118      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
119    }
120    // make sure that there is no queues with the same peer id. This may happen when we create a
121    // peer with the same id with a old deleted peer. If the replication queues for the old peer
122    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
123    // file may also be replicated.
124    checkQueuesDeleted(peerId);
125  }
126
127  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
128    ReplicationPeerDescription desc = peers.get(peerId);
129    if (desc == null) {
130      throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist");
131    }
132    return desc;
133  }
134
135  private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException {
136    ReplicationPeerDescription desc = peers.get(peerId);
137    if (desc != null && desc.getPeerConfig().isSyncReplication()
138        && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState())) {
139      throw new DoNotRetryIOException("Couldn't remove synchronous replication peer with state="
140          + desc.getSyncReplicationState()
141          + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly.");
142    }
143  }
144
145  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
146    ReplicationPeerDescription pd = checkPeerExists(peerId);
147    checkPeerInDAStateIfSyncReplication(peerId);
148    return pd.getPeerConfig();
149  }
150
151  void preEnablePeer(String peerId) throws DoNotRetryIOException {
152    ReplicationPeerDescription desc = checkPeerExists(peerId);
153    if (desc.isEnabled()) {
154      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
155    }
156  }
157
158  void preDisablePeer(String peerId) throws DoNotRetryIOException {
159    ReplicationPeerDescription desc = checkPeerExists(peerId);
160    if (!desc.isEnabled()) {
161      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
162    }
163  }
164
165  /**
166   * Return the old peer description. Can never be null.
167   */
168  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
169      throws DoNotRetryIOException {
170    checkPeerConfig(peerConfig);
171    ReplicationPeerDescription desc = checkPeerExists(peerId);
172    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
173    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
174      throw new DoNotRetryIOException(
175        "Changing the cluster key on an existing peer is not allowed. Existing key '" +
176          oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
177          peerConfig.getClusterKey() + "'");
178    }
179
180    if (!isStringEquals(peerConfig.getReplicationEndpointImpl(),
181      oldPeerConfig.getReplicationEndpointImpl())) {
182      throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
183        "on an existing peer is not allowed. Existing class '" +
184        oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
185        " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
186    }
187
188    if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) {
189      throw new DoNotRetryIOException(
190        "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " +
191          "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId +
192          " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
193    }
194
195    if (oldPeerConfig.isSyncReplication()) {
196      if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) {
197        throw new DoNotRetryIOException(
198          "Changing the replicated namespace/table config on a synchronous replication " +
199            "peer(peerId: " + peerId + ") is not allowed.");
200      }
201    }
202    return desc;
203  }
204
205  /**
206   * @return the old desciption of the peer
207   */
208  ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId,
209      SyncReplicationState state) throws DoNotRetryIOException {
210    ReplicationPeerDescription desc = checkPeerExists(peerId);
211    SyncReplicationState fromState = desc.getSyncReplicationState();
212    EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
213    if (allowedToStates == null || !allowedToStates.contains(state)) {
214      throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState +
215        " to " + state + " for peer id=" + peerId);
216    }
217    return desc;
218  }
219
220  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
221      throws ReplicationException {
222    if (peers.containsKey(peerId)) {
223      // this should be a retry, just return
224      return;
225    }
226    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
227    SyncReplicationState syncReplicationState =
228      copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
229        : SyncReplicationState.NONE;
230    peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
231    peers.put(peerId,
232      new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
233  }
234
235  public void removePeer(String peerId) throws ReplicationException {
236    if (!peers.containsKey(peerId)) {
237      // this should be a retry, just return
238      return;
239    }
240    peerStorage.removePeer(peerId);
241    peers.remove(peerId);
242  }
243
244  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
245    ReplicationPeerDescription desc = peers.get(peerId);
246    if (desc.isEnabled() == enabled) {
247      // this should be a retry, just return
248      return;
249    }
250    peerStorage.setPeerState(peerId, enabled);
251    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(),
252      desc.getSyncReplicationState()));
253  }
254
255  public void enablePeer(String peerId) throws ReplicationException {
256    setPeerState(peerId, true);
257  }
258
259  public void disablePeer(String peerId) throws ReplicationException {
260    setPeerState(peerId, false);
261  }
262
263  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
264      throws ReplicationException {
265    // the checking rules are too complicated here so we give up checking whether this is a retry.
266    ReplicationPeerDescription desc = peers.get(peerId);
267    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
268    ReplicationPeerConfigBuilder newPeerConfigBuilder =
269      ReplicationPeerConfig.newBuilder(peerConfig);
270    // we need to use the new conf to overwrite the old one.
271    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
272    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
273    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
274    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
275    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
276    peerStorage.updatePeerConfig(peerId, newPeerConfig);
277    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
278      desc.getSyncReplicationState()));
279  }
280
281  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
282    if (pattern == null) {
283      return new ArrayList<>(peers.values());
284    }
285    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
286      .collect(Collectors.toList());
287  }
288
289  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
290    ReplicationPeerDescription desc = peers.get(peerId);
291    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
292  }
293
294  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
295    queueStorage.removeLastSequenceIds(peerId);
296  }
297
298  public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
299      throws ReplicationException {
300    peerStorage.setPeerNewSyncReplicationState(peerId, state);
301  }
302
303  public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState)
304      throws ReplicationException {
305    if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) {
306      // Only transit if this is not a retry
307      peerStorage.transitPeerSyncReplicationState(peerId);
308    }
309    ReplicationPeerDescription desc = peers.get(peerId);
310    if (desc.getSyncReplicationState() != newState) {
311      // Only recreate the desc if this is not a retry
312      peers.put(peerId,
313        new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState));
314    }
315  }
316
317  public void removeAllQueues(String peerId) throws ReplicationException {
318    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
319    // on-going when the refresh peer config procedure is done, if a RS which has already been
320    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
321    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
322    // source, then the queue will leave there until the another RS detects the crash and helps
323    // removing the queue.
324    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
325    // claiming, it will either under the old RS or under the new RS, and a queue can only be
326    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
327    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
328    // unless it has already been removed by others.
329    ReplicationUtils.removeAllQueues(queueStorage, peerId);
330    ReplicationUtils.removeAllQueues(queueStorage, peerId);
331  }
332
333  public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
334    removeAllQueues(peerId);
335    queueStorage.removePeerFromHFileRefs(peerId);
336  }
337
338  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
339    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
340    boolean checkClusterKey = true;
341    if (!StringUtils.isBlank(replicationEndpointImpl)) {
342      // try creating a instance
343      ReplicationEndpoint endpoint;
344      try {
345        endpoint = Class.forName(replicationEndpointImpl)
346          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
347      } catch (Throwable e) {
348        throw new DoNotRetryIOException(
349          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
350          e);
351      }
352      // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
353      if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
354        checkClusterKey = false;
355      }
356    }
357    if (checkClusterKey) {
358      checkClusterKey(peerConfig.getClusterKey());
359    }
360
361    if (peerConfig.replicateAllUserTables()) {
362      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
363      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
364      // cluster.
365      if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) ||
366        (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
367        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " +
368          "when you want replicate all cluster");
369      }
370      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
371        peerConfig.getExcludeTableCFsMap());
372    } else {
373      // If replicate_all flag is false, it means all user tables can't be replicated to peer
374      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
375      // cluster.
376      if ((peerConfig.getExcludeNamespaces() != null &&
377        !peerConfig.getExcludeNamespaces().isEmpty()) ||
378        (peerConfig.getExcludeTableCFsMap() != null &&
379          !peerConfig.getExcludeTableCFsMap().isEmpty())) {
380        throw new DoNotRetryIOException(
381          "Need clean exclude-namespaces or exclude-table-cfs config firstly" +
382            " when replicate_all flag is false");
383      }
384      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
385        peerConfig.getTableCFsMap());
386    }
387
388    if (peerConfig.isSyncReplication()) {
389      checkPeerConfigForSyncReplication(peerConfig);
390    }
391
392    checkConfiguredWALEntryFilters(peerConfig);
393  }
394
395  private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig)
396      throws DoNotRetryIOException {
397    // This is used to reduce the difficulty for implementing the sync replication state transition
398    // as we need to reopen all the related regions.
399    // TODO: Add namespace, replicat_all flag back
400    if (peerConfig.replicateAllUserTables()) {
401      throw new DoNotRetryIOException(
402        "Only support replicated table config for sync replication peer");
403    }
404    if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
405      throw new DoNotRetryIOException(
406        "Only support replicated table config for sync replication peer");
407    }
408    if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
409      throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
410    }
411    for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
412      if (cfs != null && !cfs.isEmpty()) {
413        throw new DoNotRetryIOException(
414          "Only support replicated table config for sync replication peer");
415      }
416    }
417
418    Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
419    if (!remoteWALDir.isAbsolute()) {
420      throw new DoNotRetryIOException(
421        "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
422    }
423    URI remoteWALDirUri = remoteWALDir.toUri();
424    if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
425      throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() +
426        " is not qualified, you must provide scheme and authority");
427    }
428  }
429
430  private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
431      throws DoNotRetryIOException {
432    for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
433      for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
434        ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
435        if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
436          throw new DoNotRetryIOException(
437              "Table " + tableName + " has been replicated by peer " + entry.getKey());
438        }
439      }
440    }
441  }
442
443  /**
444   * Set a namespace in the peer config means that all tables in this namespace will be replicated
445   * to the peer cluster.
446   * <ol>
447   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
448   * the peer config.</li>
449   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
450   * config.</li>
451   * </ol>
452   * <p>
453   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
454   * replicated to the peer cluster.
455   * <ol>
456   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
457   * this namespace to the peer config.</li>
458   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
459   * exclude namespace.</li>
460   * </ol>
461   */
462  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
463      Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
464    if (namespaces == null || namespaces.isEmpty()) {
465      return;
466    }
467    if (tableCfs == null || tableCfs.isEmpty()) {
468      return;
469    }
470    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
471      TableName table = entry.getKey();
472      if (namespaces.contains(table.getNamespaceAsString())) {
473        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " +
474          table.getNamespaceAsString() + " in peer config");
475      }
476    }
477  }
478
479  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
480      throws DoNotRetryIOException {
481    String filterCSV = peerConfig.getConfiguration()
482      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
483    if (filterCSV != null && !filterCSV.isEmpty()) {
484      String[] filters = filterCSV.split(",");
485      for (String filter : filters) {
486        try {
487          Class.forName(filter).getDeclaredConstructor().newInstance();
488        } catch (Exception e) {
489          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
490            " could not be created. Failing add/update peer operation.", e);
491        }
492      }
493    }
494  }
495
496  private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
497    try {
498      ZKConfig.validateClusterKey(clusterKey);
499    } catch (IOException e) {
500      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
501    }
502  }
503
504  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
505    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
506      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
507      .collect(Collectors.toList());
508  }
509
510  public ReplicationQueueStorage getQueueStorage() {
511    return queueStorage;
512  }
513
514  public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
515      throws ReplicationException {
516    ReplicationPeerStorage peerStorage =
517      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
518    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
519    for (String peerId : peerStorage.listPeerIds()) {
520      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
521      boolean enabled = peerStorage.isPeerEnabled(peerId);
522      SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
523      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
524    }
525    return new ReplicationPeerManager(peerStorage,
526      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
527  }
528
529  /**
530   * For replication peer cluster key or endpoint class, null and empty string is same. So here
531   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
532   */
533  private boolean isStringEquals(String s1, String s2) {
534    if (StringUtils.isBlank(s1)) {
535      return StringUtils.isBlank(s2);
536    }
537    return s1.equals(s2);
538  }
539
540  public void acquireSyncReplicationPeerLock() throws InterruptedException {
541    syncReplicationPeerLock.acquire();
542  }
543
544  public void releaseSyncReplicationPeerLock() {
545    syncReplicationPeerLock.release();
546  }
547}