001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.net.URI;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.TimeUnit;
036import java.util.regex.Pattern;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.ClusterMetrics;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.Connection;
050import org.apache.hadoop.hbase.client.ConnectionFactory;
051import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
052import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
053import org.apache.hadoop.hbase.conf.ConfigurationObserver;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
056import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
058import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
059import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
060import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
061import org.apache.hadoop.hbase.replication.ReplicationException;
062import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
063import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
064import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
065import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
066import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
067import org.apache.hadoop.hbase.replication.ReplicationQueueData;
068import org.apache.hadoop.hbase.replication.ReplicationQueueId;
069import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
070import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
071import org.apache.hadoop.hbase.replication.ReplicationUtils;
072import org.apache.hadoop.hbase.replication.SyncReplicationState;
073import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
074import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
075import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
076import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
077import org.apache.hadoop.hbase.util.FutureUtils;
078import org.apache.hadoop.hbase.util.Pair;
079import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
080import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
081import org.apache.hadoop.hbase.zookeeper.ZKConfig;
082import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
083import org.apache.yetus.audience.InterfaceAudience;
084import org.apache.zookeeper.KeeperException;
085import org.slf4j.Logger;
086import org.slf4j.LoggerFactory;
087
088import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
089import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
090
091/**
092 * Manages and performs all replication admin operations.
093 * <p>
094 * Used to add/remove a replication peer.
095 * <p>
096 * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
097 * supporting migrating across different replication peer storages without restarting master.
098 */
099@InterfaceAudience.Private
100public class ReplicationPeerManager implements ConfigurationObserver {
101
102  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);
103
104  private volatile ReplicationPeerStorage peerStorage;
105
106  private final ReplicationQueueStorage queueStorage;
107
108  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
109
110  private final ImmutableMap<SyncReplicationState,
111    EnumSet<SyncReplicationState>> allowedTransition =
112      Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
113        EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY),
114        SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE),
115        SyncReplicationState.DOWNGRADE_ACTIVE,
116        EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
117
118  private final String clusterId;
119
120  private volatile Configuration conf;
121
122  // for dynamic recreating ReplicationPeerStorage.
123  private final FileSystem fs;
124
125  private final ZKWatcher zk;
126
127  @FunctionalInterface
128  interface ReplicationQueueStorageInitializer {
129
130    void initialize() throws IOException;
131  }
132
133  private final ReplicationQueueStorageInitializer queueStorageInitializer;
134
135  // we will mock this class in UT so leave the constructor as package private and not mark the
136  // class as final, since mockito can not mock a final class
137  ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage,
138    ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers,
139    Configuration conf, String clusterId,
140    ReplicationQueueStorageInitializer queueStorageInitializer) {
141    this.fs = fs;
142    this.zk = zk;
143    this.peerStorage = peerStorage;
144    this.queueStorage = queueStorage;
145    this.peers = peers;
146    this.conf = conf;
147    this.clusterId = clusterId;
148    this.queueStorageInitializer = queueStorageInitializer;
149  }
150
151  private void checkQueuesDeleted(String peerId)
152    throws ReplicationException, DoNotRetryIOException {
153    List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(peerId);
154    if (!queueIds.isEmpty()) {
155      throw new DoNotRetryIOException("There are still " + queueIds.size()
156        + " undeleted queue(s) for peerId: " + peerId + ", first is " + queueIds.get(0));
157    }
158    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
159      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
160    }
161  }
162
163  private void initializeQueueStorage() throws IOException {
164    queueStorageInitializer.initialize();
165  }
166
167  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
168    throws ReplicationException, IOException {
169    if (peerId.contains("-")) {
170      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
171    }
172    checkPeerConfig(peerConfig);
173    if (peerConfig.isSyncReplication()) {
174      checkSyncReplicationPeerConfigConflict(peerConfig);
175    }
176    if (peers.containsKey(peerId)) {
177      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
178    }
179
180    // lazy create table
181    initializeQueueStorage();
182    // make sure that there is no queues with the same peer id. This may happen when we create a
183    // peer with the same id with a old deleted peer. If the replication queues for the old peer
184    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
185    // file may also be replicated.
186    checkQueuesDeleted(peerId);
187  }
188
189  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
190    ReplicationPeerDescription desc = peers.get(peerId);
191    if (desc == null) {
192      throw new ReplicationPeerNotFoundException(peerId);
193    }
194    return desc;
195  }
196
197  private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException {
198    ReplicationPeerDescription desc = peers.get(peerId);
199    if (
200      desc != null && desc.getPeerConfig().isSyncReplication()
201        && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState())
202    ) {
203      throw new DoNotRetryIOException(
204        "Couldn't remove synchronous replication peer with state=" + desc.getSyncReplicationState()
205          + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly.");
206    }
207  }
208
209  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
210    ReplicationPeerDescription pd = checkPeerExists(peerId);
211    checkPeerInDAStateIfSyncReplication(peerId);
212    return pd.getPeerConfig();
213  }
214
215  void preEnablePeer(String peerId) throws DoNotRetryIOException {
216    ReplicationPeerDescription desc = checkPeerExists(peerId);
217    if (desc.isEnabled()) {
218      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
219    }
220  }
221
222  void preDisablePeer(String peerId) throws DoNotRetryIOException {
223    ReplicationPeerDescription desc = checkPeerExists(peerId);
224    if (!desc.isEnabled()) {
225      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
226    }
227  }
228
229  /**
230   * Return the old peer description. Can never be null.
231   */
232  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
233    throws DoNotRetryIOException {
234    checkPeerConfig(peerConfig);
235    ReplicationPeerDescription desc = checkPeerExists(peerId);
236    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
237    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
238      throw new DoNotRetryIOException(
239        "Changing the cluster key on an existing peer is not allowed. Existing key '"
240          + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '"
241          + peerConfig.getClusterKey() + "'");
242    }
243
244    if (
245      !isStringEquals(peerConfig.getReplicationEndpointImpl(),
246        oldPeerConfig.getReplicationEndpointImpl())
247    ) {
248      throw new DoNotRetryIOException("Changing the replication endpoint implementation class "
249        + "on an existing peer is not allowed. Existing class '"
250        + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId
251        + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
252    }
253
254    if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) {
255      throw new DoNotRetryIOException(
256        "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal "
257          + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId
258          + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
259    }
260
261    if (oldPeerConfig.isSyncReplication()) {
262      if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) {
263        throw new DoNotRetryIOException(
264          "Changing the replicated namespace/table config on a synchronous replication "
265            + "peer(peerId: " + peerId + ") is not allowed.");
266      }
267    }
268    return desc;
269  }
270
271  /** Returns the old desciption of the peer */
272  ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId,
273    SyncReplicationState state) throws DoNotRetryIOException {
274    ReplicationPeerDescription desc = checkPeerExists(peerId);
275    SyncReplicationState fromState = desc.getSyncReplicationState();
276    EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
277    if (allowedToStates == null || !allowedToStates.contains(state)) {
278      throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState
279        + " to " + state + " for peer id=" + peerId);
280    }
281    return desc;
282  }
283
284  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
285    throws ReplicationException {
286    if (peers.containsKey(peerId)) {
287      // this should be a retry, just return
288      return;
289    }
290    peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
291    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
292    SyncReplicationState syncReplicationState = copiedPeerConfig.isSyncReplication()
293      ? SyncReplicationState.DOWNGRADE_ACTIVE
294      : SyncReplicationState.NONE;
295    peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
296    peers.put(peerId,
297      new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
298  }
299
300  public void removePeer(String peerId) throws ReplicationException {
301    if (!peers.containsKey(peerId)) {
302      // this should be a retry, just return
303      return;
304    }
305    peerStorage.removePeer(peerId);
306    peers.remove(peerId);
307  }
308
309  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
310    ReplicationPeerDescription desc = peers.get(peerId);
311    if (desc.isEnabled() == enabled) {
312      // this should be a retry, just return
313      return;
314    }
315    peerStorage.setPeerState(peerId, enabled);
316    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(),
317      desc.getSyncReplicationState()));
318  }
319
320  public boolean getPeerState(String peerId) throws ReplicationException {
321    ReplicationPeerDescription desc = peers.get(peerId);
322    if (desc != null) {
323      return desc.isEnabled();
324    } else {
325      throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
326    }
327  }
328
329  public void enablePeer(String peerId) throws ReplicationException {
330    setPeerState(peerId, true);
331  }
332
333  public void disablePeer(String peerId) throws ReplicationException {
334    setPeerState(peerId, false);
335  }
336
337  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
338    throws ReplicationException {
339    // the checking rules are too complicated here so we give up checking whether this is a retry.
340    ReplicationPeerDescription desc = peers.get(peerId);
341    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
342    ReplicationPeerConfigBuilder newPeerConfigBuilder =
343      ReplicationPeerConfig.newBuilder(peerConfig);
344    // we need to use the new conf to overwrite the old one.
345    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
346    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
347    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
348    peerStorage.updatePeerConfig(peerId, newPeerConfig);
349    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
350      desc.getSyncReplicationState()));
351  }
352
353  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
354    if (pattern == null) {
355      return new ArrayList<>(peers.values());
356    }
357    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
358      .collect(Collectors.toList());
359  }
360
361  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
362    ReplicationPeerDescription desc = peers.get(peerId);
363    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
364  }
365
366  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
367    queueStorage.removeLastSequenceIds(peerId);
368  }
369
370  public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
371    throws ReplicationException {
372    peerStorage.setPeerNewSyncReplicationState(peerId, state);
373  }
374
375  public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState)
376    throws ReplicationException {
377    if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) {
378      // Only transit if this is not a retry
379      peerStorage.transitPeerSyncReplicationState(peerId);
380    }
381    ReplicationPeerDescription desc = peers.get(peerId);
382    if (desc.getSyncReplicationState() != newState) {
383      // Only recreate the desc if this is not a retry
384      peers.put(peerId,
385        new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState));
386    }
387  }
388
389  public void removeAllQueues(String peerId) throws ReplicationException {
390    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
391    // on-going when the refresh peer config procedure is done, if a RS which has already been
392    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
393    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
394    // source, then the queue will leave there until the another RS detects the crash and helps
395    // removing the queue.
396    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
397    // claiming, it will either under the old RS or under the new RS, and a queue can only be
398    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
399    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
400    // unless it has already been removed by others.
401    queueStorage.removeAllQueues(peerId);
402    queueStorage.removeAllQueues(peerId);
403  }
404
405  public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
406    removeAllQueues(peerId);
407    queueStorage.removePeerFromHFileRefs(peerId);
408  }
409
410  private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint)
411    throws DoNotRetryIOException {
412    if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) {
413      return;
414    }
415    // Endpoints implementing HBaseReplicationEndpoint need to check cluster key
416    URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey);
417    try {
418      if (connectionUri != null) {
419        ConnectionRegistryFactory.validate(connectionUri);
420      } else {
421        ZKConfig.validateClusterKey(clusterKey);
422      }
423    } catch (IOException e) {
424      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
425    }
426    if (endpoint != null && endpoint.canReplicateToSameCluster()) {
427      return;
428    }
429    // make sure we do not replicate to same cluster
430    String peerClusterId;
431    try {
432      if (connectionUri != null) {
433        // fetch cluster id through standard admin API
434        try (Connection conn = ConnectionFactory.createConnection(connectionUri, conf);
435          Admin admin = conn.getAdmin()) {
436          peerClusterId =
437            admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId();
438        }
439      } else {
440        // Create the peer cluster config for get peer cluster id
441        Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
442        try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
443          peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
444        }
445      }
446    } catch (IOException | KeeperException e) {
447      // we just want to check whether we will replicate to the same cluster, so if we get an error
448      // while getting the cluster id of the peer cluster, it means we are not connecting to
449      // ourselves, as we are still alive. So here we just log the error and continue
450      LOG.warn("Can't get peerClusterId for clusterKey=" + clusterKey, e);
451      return;
452    }
453    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
454    // peerClusterId value, which is the same as the source clusterId
455    if (clusterId.equals(peerClusterId)) {
456      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
457        + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
458    }
459  }
460
461  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
462    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
463    ReplicationEndpoint endpoint = null;
464    if (!StringUtils.isBlank(replicationEndpointImpl)) {
465      try {
466        // try creating a instance
467        endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class)
468          .getDeclaredConstructor().newInstance();
469      } catch (Throwable e) {
470        throw new DoNotRetryIOException(
471          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
472          e);
473      }
474    }
475    checkClusterKey(peerConfig.getClusterKey(), endpoint);
476
477    if (peerConfig.replicateAllUserTables()) {
478      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
479      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
480      // cluster.
481      if (
482        (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
483          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())
484      ) {
485        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
486          + "when you want replicate all cluster");
487      }
488      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
489        peerConfig.getExcludeTableCFsMap());
490    } else {
491      // If replicate_all flag is false, it means all user tables can't be replicated to peer
492      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
493      // cluster.
494      if (
495        (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty())
496          || (peerConfig.getExcludeTableCFsMap() != null
497            && !peerConfig.getExcludeTableCFsMap().isEmpty())
498      ) {
499        throw new DoNotRetryIOException(
500          "Need clean exclude-namespaces or exclude-table-cfs config firstly"
501            + " when replicate_all flag is false");
502      }
503      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
504        peerConfig.getTableCFsMap());
505    }
506
507    if (peerConfig.isSyncReplication()) {
508      checkPeerConfigForSyncReplication(peerConfig);
509    }
510
511    checkConfiguredWALEntryFilters(peerConfig);
512  }
513
514  private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig)
515    throws DoNotRetryIOException {
516    // This is used to reduce the difficulty for implementing the sync replication state transition
517    // as we need to reopen all the related regions.
518    // TODO: Add namespace, replicat_all flag back
519    if (peerConfig.replicateAllUserTables()) {
520      throw new DoNotRetryIOException(
521        "Only support replicated table config for sync replication peer");
522    }
523    if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
524      throw new DoNotRetryIOException(
525        "Only support replicated table config for sync replication peer");
526    }
527    if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
528      throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
529    }
530    for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
531      if (cfs != null && !cfs.isEmpty()) {
532        throw new DoNotRetryIOException(
533          "Only support replicated table config for sync replication peer");
534      }
535    }
536
537    Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
538    if (!remoteWALDir.isAbsolute()) {
539      throw new DoNotRetryIOException(
540        "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
541    }
542    URI remoteWALDirUri = remoteWALDir.toUri();
543    if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
544      throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir()
545        + " is not qualified, you must provide scheme and authority");
546    }
547  }
548
549  private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
550    throws DoNotRetryIOException {
551    for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
552      for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
553        ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
554        if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
555          throw new DoNotRetryIOException(
556            "Table " + tableName + " has been replicated by peer " + entry.getKey());
557        }
558      }
559    }
560  }
561
562  /**
563   * Set a namespace in the peer config means that all tables in this namespace will be replicated
564   * to the peer cluster.
565   * <ol>
566   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
567   * the peer config.</li>
568   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
569   * config.</li>
570   * </ol>
571   * <p>
572   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
573   * replicated to the peer cluster.
574   * <ol>
575   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
576   * this namespace to the peer config.</li>
577   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
578   * exclude namespace.</li>
579   * </ol>
580   */
581  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
582    Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
583    if (namespaces == null || namespaces.isEmpty()) {
584      return;
585    }
586    if (tableCfs == null || tableCfs.isEmpty()) {
587      return;
588    }
589    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
590      TableName table = entry.getKey();
591      if (namespaces.contains(table.getNamespaceAsString())) {
592        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces "
593          + table.getNamespaceAsString() + " in peer config");
594      }
595    }
596  }
597
598  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
599    throws DoNotRetryIOException {
600    String filterCSV = peerConfig.getConfiguration()
601      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
602    if (filterCSV != null && !filterCSV.isEmpty()) {
603      String[] filters = filterCSV.split(",");
604      for (String filter : filters) {
605        try {
606          Class.forName(filter).getDeclaredConstructor().newInstance();
607        } catch (Exception e) {
608          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter
609            + " could not be created. Failing add/update peer operation.", e);
610        }
611      }
612    }
613  }
614
615  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
616    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
617      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
618      .collect(Collectors.toList());
619  }
620
621  @RestrictedApi(explanation = "Should only be called in tests", link = "",
622      allowedOnPath = ".*/src/test/.*")
623  public ReplicationPeerStorage getPeerStorage() {
624    return peerStorage;
625  }
626
627  public ReplicationQueueStorage getQueueStorage() {
628    return queueStorage;
629  }
630
631  private static Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer>
632    createReplicationQueueStorage(MasterServices services) throws IOException {
633    Configuration conf = services.getConfiguration();
634    TableName replicationQueueTableName =
635      TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
636        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
637    ReplicationQueueStorageInitializer initializer;
638    if (services.getTableDescriptors().exists(replicationQueueTableName)) {
639      // no need to create the table
640      initializer = () -> {
641      };
642    } else {
643      // lazy create the replication table.
644      initializer = new ReplicationQueueStorageInitializer() {
645
646        private volatile boolean created = false;
647
648        @Override
649        public void initialize() throws IOException {
650          if (created) {
651            return;
652          }
653          synchronized (this) {
654            if (created) {
655              return;
656            }
657            if (services.getTableDescriptors().exists(replicationQueueTableName)) {
658              created = true;
659              return;
660            }
661            long procId = services.createSystemTable(ReplicationStorageFactory
662              .createReplicationQueueTableDescriptor(replicationQueueTableName));
663            ProcedureExecutor<MasterProcedureEnv> procExec = services.getMasterProcedureExecutor();
664            ProcedureSyncWait.waitFor(procExec.getEnvironment(), TimeUnit.MINUTES.toMillis(1),
665              "Creating table " + replicationQueueTableName, () -> procExec.isFinished(procId));
666          }
667        }
668      };
669    }
670    return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage(
671      services.getConnection(), conf, replicationQueueTableName), initializer);
672  }
673
674  public static ReplicationPeerManager create(MasterServices services, String clusterId)
675    throws ReplicationException, IOException {
676    Configuration conf = services.getConfiguration();
677    FileSystem fs = services.getMasterFileSystem().getFileSystem();
678    ZKWatcher zk = services.getZooKeeper();
679    ReplicationPeerStorage peerStorage =
680      ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
681    Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> pair =
682      createReplicationQueueStorage(services);
683    ReplicationQueueStorage queueStorage = pair.getFirst();
684    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
685    for (String peerId : peerStorage.listPeerIds()) {
686      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
687      if (
688        ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
689          .equals(peerConfig.getReplicationEndpointImpl())
690      ) {
691        // we do not use this endpoint for region replication any more, see HBASE-26233
692        LOG.info("Legacy region replication peer found, removing: {}", peerConfig);
693        // do it asynchronous to not block the start up of HMaster
694        new Thread("Remove legacy replication peer " + peerId) {
695
696          @Override
697          public void run() {
698            try {
699              // need to delete two times to make sure we delete all the queues, see the comments in
700              // above
701              // removeAllQueues method for more details.
702              queueStorage.removeAllQueues(peerId);
703              queueStorage.removeAllQueues(peerId);
704              // delete queue first and then peer, because we use peer as a flag.
705              peerStorage.removePeer(peerId);
706            } catch (Exception e) {
707              LOG.warn("Failed to delete legacy replication peer {}", peerId);
708            }
709          }
710        }.start();
711        continue;
712      }
713      peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
714      peerStorage.updatePeerConfig(peerId, peerConfig);
715      boolean enabled = peerStorage.isPeerEnabled(peerId);
716      SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
717      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
718    }
719    return new ReplicationPeerManager(fs, zk, peerStorage, queueStorage, peers, conf, clusterId,
720      pair.getSecond());
721  }
722
723  /**
724   * For replication peer cluster key or endpoint class, null and empty string is same. So here
725   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
726   */
727  private boolean isStringEquals(String s1, String s2) {
728    if (StringUtils.isBlank(s1)) {
729      return StringUtils.isBlank(s2);
730    }
731    return s1.equals(s2);
732  }
733
734  @Override
735  public void onConfigurationChange(Configuration conf) {
736    this.conf = conf;
737    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
738  }
739
740  private ReplicationQueueData convert(ZkReplicationQueueData zkData) {
741    Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>();
742    zkData.getWalOffsets().forEach((wal, offset) -> {
743      String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
744      groupOffsets.compute(walGroup, (k, oldOffset) -> {
745        if (oldOffset == null) {
746          return new ReplicationGroupOffset(wal, offset);
747        }
748        // we should record the first wal's offset
749        long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal());
750        long walTs = AbstractFSWALProvider.getTimestamp(wal);
751        if (walTs < oldWalTs) {
752          return new ReplicationGroupOffset(wal, offset);
753        }
754        return oldOffset;
755      });
756    });
757    return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets));
758  }
759
760  private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage)
761    throws Exception {
762    MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
763      oldQueueStorage.listAllQueues();
764    for (;;) {
765      Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
766      if (pair == null) {
767        return;
768      }
769      queueStorage.batchUpdateQueues(pair.getFirst(),
770        pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId()))
771          .map(this::convert).collect(Collectors.toList()));
772    }
773  }
774
775  private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage)
776    throws Exception {
777    MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds();
778    for (;;) {
779      List<ZkLastPushedSeqId> list = iter.next();
780      if (list == null) {
781        return;
782      }
783      queueStorage.batchUpdateLastSequenceIds(list.stream()
784        .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList()));
785    }
786  }
787
788  private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage)
789    throws Exception {
790    MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs();
791    for (;;) {
792      Pair<String, List<String>> pair = iter.next();
793      if (pair == null) {
794        return;
795      }
796      if (peers.containsKey(pair.getFirst())) {
797        queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond());
798      }
799    }
800  }
801
802  private interface ExceptionalRunnable {
803    void run() throws Exception;
804  }
805
806  private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) {
807    CompletableFuture<?> future = new CompletableFuture<>();
808    executor.execute(() -> {
809      try {
810        task.run();
811        future.complete(null);
812      } catch (Exception e) {
813        future.completeExceptionally(e);
814      }
815    });
816    return future;
817  }
818
819  /**
820   * Submit the migration tasks to the given {@code executor}.
821   */
822  CompletableFuture<Void> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
823    // the replication queue table creation is asynchronous and will be triggered by addPeer, so
824    // here we need to manually initialize it since we will not call addPeer.
825    try {
826      initializeQueueStorage();
827    } catch (IOException e) {
828      return FutureUtils.failedFuture(e);
829    }
830    ZKReplicationQueueStorageForMigration oldStorage =
831      new ZKReplicationQueueStorageForMigration(zookeeper, conf);
832    return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor),
833      runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
834      runAsync(() -> migrateHFileRefs(oldStorage), executor));
835  }
836}