001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.net.URI;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.EnumSet;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.CompletableFuture;
032import java.util.concurrent.ConcurrentHashMap;
033import java.util.concurrent.ConcurrentMap;
034import java.util.concurrent.ExecutorService;
035import java.util.concurrent.TimeUnit;
036import java.util.regex.Pattern;
037import java.util.stream.Collectors;
038import org.apache.commons.lang3.StringUtils;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
048import org.apache.hadoop.hbase.conf.ConfigurationObserver;
049import org.apache.hadoop.hbase.master.MasterServices;
050import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
051import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
052import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
053import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
054import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
055import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
056import org.apache.hadoop.hbase.replication.ReplicationException;
057import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
058import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
059import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
060import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
061import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
062import org.apache.hadoop.hbase.replication.ReplicationQueueData;
063import org.apache.hadoop.hbase.replication.ReplicationQueueId;
064import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
065import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
066import org.apache.hadoop.hbase.replication.ReplicationUtils;
067import org.apache.hadoop.hbase.replication.SyncReplicationState;
068import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
069import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
070import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
071import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
072import org.apache.hadoop.hbase.util.FutureUtils;
073import org.apache.hadoop.hbase.util.Pair;
074import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
075import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
076import org.apache.hadoop.hbase.zookeeper.ZKConfig;
077import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
078import org.apache.yetus.audience.InterfaceAudience;
079import org.apache.zookeeper.KeeperException;
080import org.slf4j.Logger;
081import org.slf4j.LoggerFactory;
082
083import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
084import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
085
086/**
087 * Manages and performs all replication admin operations.
088 * <p>
089 * Used to add/remove a replication peer.
090 * <p>
091 * Implement {@link ConfigurationObserver} mainly for recreating {@link ReplicationPeerStorage}, for
092 * supporting migrating across different replication peer storages without restarting master.
093 */
094@InterfaceAudience.Private
095public class ReplicationPeerManager implements ConfigurationObserver {
096
097  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerManager.class);
098
099  private volatile ReplicationPeerStorage peerStorage;
100
101  private final ReplicationQueueStorage queueStorage;
102
103  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
104
105  private final ImmutableMap<SyncReplicationState,
106    EnumSet<SyncReplicationState>> allowedTransition =
107      Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE,
108        EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY),
109        SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE),
110        SyncReplicationState.DOWNGRADE_ACTIVE,
111        EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
112
113  private final String clusterId;
114
115  private volatile Configuration conf;
116
117  // for dynamic recreating ReplicationPeerStorage.
118  private final FileSystem fs;
119
120  private final ZKWatcher zk;
121
122  @FunctionalInterface
123  interface ReplicationQueueStorageInitializer {
124
125    void initialize() throws IOException;
126  }
127
128  private final ReplicationQueueStorageInitializer queueStorageInitializer;
129
130  // we will mock this class in UT so leave the constructor as package private and not mark the
131  // class as final, since mockito can not mock a final class
132  ReplicationPeerManager(FileSystem fs, ZKWatcher zk, ReplicationPeerStorage peerStorage,
133    ReplicationQueueStorage queueStorage, ConcurrentMap<String, ReplicationPeerDescription> peers,
134    Configuration conf, String clusterId,
135    ReplicationQueueStorageInitializer queueStorageInitializer) {
136    this.fs = fs;
137    this.zk = zk;
138    this.peerStorage = peerStorage;
139    this.queueStorage = queueStorage;
140    this.peers = peers;
141    this.conf = conf;
142    this.clusterId = clusterId;
143    this.queueStorageInitializer = queueStorageInitializer;
144  }
145
146  private void checkQueuesDeleted(String peerId)
147    throws ReplicationException, DoNotRetryIOException {
148    List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(peerId);
149    if (!queueIds.isEmpty()) {
150      throw new DoNotRetryIOException("There are still " + queueIds.size()
151        + " undeleted queue(s) for peerId: " + peerId + ", first is " + queueIds.get(0));
152    }
153    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
154      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
155    }
156  }
157
158  private void initializeQueueStorage() throws IOException {
159    queueStorageInitializer.initialize();
160  }
161
162  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
163    throws ReplicationException, IOException {
164    if (peerId.contains("-")) {
165      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
166    }
167    checkPeerConfig(peerConfig);
168    if (peerConfig.isSyncReplication()) {
169      checkSyncReplicationPeerConfigConflict(peerConfig);
170    }
171    if (peers.containsKey(peerId)) {
172      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
173    }
174
175    // lazy create table
176    initializeQueueStorage();
177    // make sure that there is no queues with the same peer id. This may happen when we create a
178    // peer with the same id with a old deleted peer. If the replication queues for the old peer
179    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
180    // file may also be replicated.
181    checkQueuesDeleted(peerId);
182  }
183
184  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
185    ReplicationPeerDescription desc = peers.get(peerId);
186    if (desc == null) {
187      throw new ReplicationPeerNotFoundException(peerId);
188    }
189    return desc;
190  }
191
192  private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException {
193    ReplicationPeerDescription desc = peers.get(peerId);
194    if (
195      desc != null && desc.getPeerConfig().isSyncReplication()
196        && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState())
197    ) {
198      throw new DoNotRetryIOException(
199        "Couldn't remove synchronous replication peer with state=" + desc.getSyncReplicationState()
200          + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly.");
201    }
202  }
203
204  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
205    ReplicationPeerDescription pd = checkPeerExists(peerId);
206    checkPeerInDAStateIfSyncReplication(peerId);
207    return pd.getPeerConfig();
208  }
209
210  void preEnablePeer(String peerId) throws DoNotRetryIOException {
211    ReplicationPeerDescription desc = checkPeerExists(peerId);
212    if (desc.isEnabled()) {
213      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
214    }
215  }
216
217  void preDisablePeer(String peerId) throws DoNotRetryIOException {
218    ReplicationPeerDescription desc = checkPeerExists(peerId);
219    if (!desc.isEnabled()) {
220      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
221    }
222  }
223
224  /**
225   * Return the old peer description. Can never be null.
226   */
227  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
228    throws DoNotRetryIOException {
229    checkPeerConfig(peerConfig);
230    ReplicationPeerDescription desc = checkPeerExists(peerId);
231    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
232    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
233      throw new DoNotRetryIOException(
234        "Changing the cluster key on an existing peer is not allowed. Existing key '"
235          + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '"
236          + peerConfig.getClusterKey() + "'");
237    }
238
239    if (
240      !isStringEquals(peerConfig.getReplicationEndpointImpl(),
241        oldPeerConfig.getReplicationEndpointImpl())
242    ) {
243      throw new DoNotRetryIOException("Changing the replication endpoint implementation class "
244        + "on an existing peer is not allowed. Existing class '"
245        + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId
246        + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
247    }
248
249    if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) {
250      throw new DoNotRetryIOException(
251        "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal "
252          + "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId
253          + " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'");
254    }
255
256    if (oldPeerConfig.isSyncReplication()) {
257      if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) {
258        throw new DoNotRetryIOException(
259          "Changing the replicated namespace/table config on a synchronous replication "
260            + "peer(peerId: " + peerId + ") is not allowed.");
261      }
262    }
263    return desc;
264  }
265
266  /** Returns the old desciption of the peer */
267  ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId,
268    SyncReplicationState state) throws DoNotRetryIOException {
269    ReplicationPeerDescription desc = checkPeerExists(peerId);
270    SyncReplicationState fromState = desc.getSyncReplicationState();
271    EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState);
272    if (allowedToStates == null || !allowedToStates.contains(state)) {
273      throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState
274        + " to " + state + " for peer id=" + peerId);
275    }
276    return desc;
277  }
278
279  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
280    throws ReplicationException {
281    if (peers.containsKey(peerId)) {
282      // this should be a retry, just return
283      return;
284    }
285    peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
286    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
287    SyncReplicationState syncReplicationState = copiedPeerConfig.isSyncReplication()
288      ? SyncReplicationState.DOWNGRADE_ACTIVE
289      : SyncReplicationState.NONE;
290    peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
291    peers.put(peerId,
292      new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
293  }
294
295  public void removePeer(String peerId) throws ReplicationException {
296    if (!peers.containsKey(peerId)) {
297      // this should be a retry, just return
298      return;
299    }
300    peerStorage.removePeer(peerId);
301    peers.remove(peerId);
302  }
303
304  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
305    ReplicationPeerDescription desc = peers.get(peerId);
306    if (desc.isEnabled() == enabled) {
307      // this should be a retry, just return
308      return;
309    }
310    peerStorage.setPeerState(peerId, enabled);
311    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(),
312      desc.getSyncReplicationState()));
313  }
314
315  public boolean getPeerState(String peerId) throws ReplicationException {
316    ReplicationPeerDescription desc = peers.get(peerId);
317    if (desc != null) {
318      return desc.isEnabled();
319    } else {
320      throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
321    }
322  }
323
324  public void enablePeer(String peerId) throws ReplicationException {
325    setPeerState(peerId, true);
326  }
327
328  public void disablePeer(String peerId) throws ReplicationException {
329    setPeerState(peerId, false);
330  }
331
332  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
333    throws ReplicationException {
334    // the checking rules are too complicated here so we give up checking whether this is a retry.
335    ReplicationPeerDescription desc = peers.get(peerId);
336    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
337    ReplicationPeerConfigBuilder newPeerConfigBuilder =
338      ReplicationPeerConfig.newBuilder(peerConfig);
339    // we need to use the new conf to overwrite the old one.
340    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
341    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
342    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
343    peerStorage.updatePeerConfig(peerId, newPeerConfig);
344    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
345      desc.getSyncReplicationState()));
346  }
347
348  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
349    if (pattern == null) {
350      return new ArrayList<>(peers.values());
351    }
352    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
353      .collect(Collectors.toList());
354  }
355
356  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
357    ReplicationPeerDescription desc = peers.get(peerId);
358    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
359  }
360
361  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
362    queueStorage.removeLastSequenceIds(peerId);
363  }
364
365  public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state)
366    throws ReplicationException {
367    peerStorage.setPeerNewSyncReplicationState(peerId, state);
368  }
369
370  public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState)
371    throws ReplicationException {
372    if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) {
373      // Only transit if this is not a retry
374      peerStorage.transitPeerSyncReplicationState(peerId);
375    }
376    ReplicationPeerDescription desc = peers.get(peerId);
377    if (desc.getSyncReplicationState() != newState) {
378      // Only recreate the desc if this is not a retry
379      peers.put(peerId,
380        new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState));
381    }
382  }
383
384  public void removeAllQueues(String peerId) throws ReplicationException {
385    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
386    // on-going when the refresh peer config procedure is done, if a RS which has already been
387    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
388    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
389    // source, then the queue will leave there until the another RS detects the crash and helps
390    // removing the queue.
391    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
392    // claiming, it will either under the old RS or under the new RS, and a queue can only be
393    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
394    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
395    // unless it has already been removed by others.
396    queueStorage.removeAllQueues(peerId);
397    queueStorage.removeAllQueues(peerId);
398  }
399
400  public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
401    removeAllQueues(peerId);
402    queueStorage.removePeerFromHFileRefs(peerId);
403  }
404
405  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
406    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
407    ReplicationEndpoint endpoint = null;
408    if (!StringUtils.isBlank(replicationEndpointImpl)) {
409      try {
410        // try creating a instance
411        endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class)
412          .getDeclaredConstructor().newInstance();
413      } catch (Throwable e) {
414        throw new DoNotRetryIOException(
415          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
416          e);
417      }
418    }
419    // Endpoints implementing HBaseReplicationEndpoint need to check cluster key
420    if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) {
421      checkClusterKey(peerConfig.getClusterKey());
422      // Check if endpoint can replicate to the same cluster
423      if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
424        checkSameClusterKey(peerConfig.getClusterKey());
425      }
426    }
427
428    if (peerConfig.replicateAllUserTables()) {
429      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
430      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
431      // cluster.
432      if (
433        (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
434          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())
435      ) {
436        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
437          + "when you want replicate all cluster");
438      }
439      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
440        peerConfig.getExcludeTableCFsMap());
441    } else {
442      // If replicate_all flag is false, it means all user tables can't be replicated to peer
443      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
444      // cluster.
445      if (
446        (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty())
447          || (peerConfig.getExcludeTableCFsMap() != null
448            && !peerConfig.getExcludeTableCFsMap().isEmpty())
449      ) {
450        throw new DoNotRetryIOException(
451          "Need clean exclude-namespaces or exclude-table-cfs config firstly"
452            + " when replicate_all flag is false");
453      }
454      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
455        peerConfig.getTableCFsMap());
456    }
457
458    if (peerConfig.isSyncReplication()) {
459      checkPeerConfigForSyncReplication(peerConfig);
460    }
461
462    checkConfiguredWALEntryFilters(peerConfig);
463  }
464
465  private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig)
466    throws DoNotRetryIOException {
467    // This is used to reduce the difficulty for implementing the sync replication state transition
468    // as we need to reopen all the related regions.
469    // TODO: Add namespace, replicat_all flag back
470    if (peerConfig.replicateAllUserTables()) {
471      throw new DoNotRetryIOException(
472        "Only support replicated table config for sync replication peer");
473    }
474    if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
475      throw new DoNotRetryIOException(
476        "Only support replicated table config for sync replication peer");
477    }
478    if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
479      throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
480    }
481    for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
482      if (cfs != null && !cfs.isEmpty()) {
483        throw new DoNotRetryIOException(
484          "Only support replicated table config for sync replication peer");
485      }
486    }
487
488    Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
489    if (!remoteWALDir.isAbsolute()) {
490      throw new DoNotRetryIOException(
491        "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
492    }
493    URI remoteWALDirUri = remoteWALDir.toUri();
494    if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
495      throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir()
496        + " is not qualified, you must provide scheme and authority");
497    }
498  }
499
500  private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
501    throws DoNotRetryIOException {
502    for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
503      for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
504        ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
505        if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
506          throw new DoNotRetryIOException(
507            "Table " + tableName + " has been replicated by peer " + entry.getKey());
508        }
509      }
510    }
511  }
512
513  /**
514   * Set a namespace in the peer config means that all tables in this namespace will be replicated
515   * to the peer cluster.
516   * <ol>
517   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
518   * the peer config.</li>
519   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
520   * config.</li>
521   * </ol>
522   * <p>
523   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
524   * replicated to the peer cluster.
525   * <ol>
526   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
527   * this namespace to the peer config.</li>
528   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
529   * exclude namespace.</li>
530   * </ol>
531   */
532  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
533    Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
534    if (namespaces == null || namespaces.isEmpty()) {
535      return;
536    }
537    if (tableCfs == null || tableCfs.isEmpty()) {
538      return;
539    }
540    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
541      TableName table = entry.getKey();
542      if (namespaces.contains(table.getNamespaceAsString())) {
543        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces "
544          + table.getNamespaceAsString() + " in peer config");
545      }
546    }
547  }
548
549  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
550    throws DoNotRetryIOException {
551    String filterCSV = peerConfig.getConfiguration()
552      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
553    if (filterCSV != null && !filterCSV.isEmpty()) {
554      String[] filters = filterCSV.split(",");
555      for (String filter : filters) {
556        try {
557          Class.forName(filter).getDeclaredConstructor().newInstance();
558        } catch (Exception e) {
559          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter
560            + " could not be created. Failing add/update peer operation.", e);
561        }
562      }
563    }
564  }
565
566  private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
567    try {
568      ZKConfig.validateClusterKey(clusterKey);
569    } catch (IOException e) {
570      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
571    }
572  }
573
574  private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException {
575    String peerClusterId = "";
576    try {
577      // Create the peer cluster config for get peer cluster id
578      Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
579      try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
580        peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
581      }
582    } catch (IOException | KeeperException e) {
583      throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
584    }
585    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
586    // peerClusterId value, which is the same as the source clusterId
587    if (clusterId.equals(peerClusterId)) {
588      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
589        + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
590    }
591  }
592
593  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
594    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
595      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
596      .collect(Collectors.toList());
597  }
598
599  @RestrictedApi(explanation = "Should only be called in tests", link = "",
600      allowedOnPath = ".*/src/test/.*")
601  public ReplicationPeerStorage getPeerStorage() {
602    return peerStorage;
603  }
604
605  public ReplicationQueueStorage getQueueStorage() {
606    return queueStorage;
607  }
608
609  private static Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer>
610    createReplicationQueueStorage(MasterServices services) throws IOException {
611    Configuration conf = services.getConfiguration();
612    TableName replicationQueueTableName =
613      TableName.valueOf(conf.get(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME,
614        ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
615    ReplicationQueueStorageInitializer initializer;
616    if (services.getTableDescriptors().exists(replicationQueueTableName)) {
617      // no need to create the table
618      initializer = () -> {
619      };
620    } else {
621      // lazy create the replication table.
622      initializer = new ReplicationQueueStorageInitializer() {
623
624        private volatile boolean created = false;
625
626        @Override
627        public void initialize() throws IOException {
628          if (created) {
629            return;
630          }
631          synchronized (this) {
632            if (created) {
633              return;
634            }
635            if (services.getTableDescriptors().exists(replicationQueueTableName)) {
636              created = true;
637              return;
638            }
639            long procId = services.createSystemTable(ReplicationStorageFactory
640              .createReplicationQueueTableDescriptor(replicationQueueTableName));
641            ProcedureExecutor<MasterProcedureEnv> procExec = services.getMasterProcedureExecutor();
642            ProcedureSyncWait.waitFor(procExec.getEnvironment(), TimeUnit.MINUTES.toMillis(1),
643              "Creating table " + replicationQueueTableName, () -> procExec.isFinished(procId));
644          }
645        }
646      };
647    }
648    return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage(
649      services.getConnection(), conf, replicationQueueTableName), initializer);
650  }
651
652  public static ReplicationPeerManager create(MasterServices services, String clusterId)
653    throws ReplicationException, IOException {
654    Configuration conf = services.getConfiguration();
655    FileSystem fs = services.getMasterFileSystem().getFileSystem();
656    ZKWatcher zk = services.getZooKeeper();
657    ReplicationPeerStorage peerStorage =
658      ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
659    Pair<ReplicationQueueStorage, ReplicationQueueStorageInitializer> pair =
660      createReplicationQueueStorage(services);
661    ReplicationQueueStorage queueStorage = pair.getFirst();
662    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
663    for (String peerId : peerStorage.listPeerIds()) {
664      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
665      if (
666        ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
667          .equals(peerConfig.getReplicationEndpointImpl())
668      ) {
669        // we do not use this endpoint for region replication any more, see HBASE-26233
670        LOG.info("Legacy region replication peer found, removing: {}", peerConfig);
671        // do it asynchronous to not block the start up of HMaster
672        new Thread("Remove legacy replication peer " + peerId) {
673
674          @Override
675          public void run() {
676            try {
677              // need to delete two times to make sure we delete all the queues, see the comments in
678              // above
679              // removeAllQueues method for more details.
680              queueStorage.removeAllQueues(peerId);
681              queueStorage.removeAllQueues(peerId);
682              // delete queue first and then peer, because we use peer as a flag.
683              peerStorage.removePeer(peerId);
684            } catch (Exception e) {
685              LOG.warn("Failed to delete legacy replication peer {}", peerId);
686            }
687          }
688        }.start();
689        continue;
690      }
691      peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
692      peerStorage.updatePeerConfig(peerId, peerConfig);
693      boolean enabled = peerStorage.isPeerEnabled(peerId);
694      SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
695      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
696    }
697    return new ReplicationPeerManager(fs, zk, peerStorage, queueStorage, peers, conf, clusterId,
698      pair.getSecond());
699  }
700
701  /**
702   * For replication peer cluster key or endpoint class, null and empty string is same. So here
703   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
704   */
705  private boolean isStringEquals(String s1, String s2) {
706    if (StringUtils.isBlank(s1)) {
707      return StringUtils.isBlank(s2);
708    }
709    return s1.equals(s2);
710  }
711
712  @Override
713  public void onConfigurationChange(Configuration conf) {
714    this.conf = conf;
715    this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf);
716  }
717
718  private ReplicationQueueData convert(ZkReplicationQueueData zkData) {
719    Map<String, ReplicationGroupOffset> groupOffsets = new HashMap<>();
720    zkData.getWalOffsets().forEach((wal, offset) -> {
721      String walGroup = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
722      groupOffsets.compute(walGroup, (k, oldOffset) -> {
723        if (oldOffset == null) {
724          return new ReplicationGroupOffset(wal, offset);
725        }
726        // we should record the first wal's offset
727        long oldWalTs = AbstractFSWALProvider.getTimestamp(oldOffset.getWal());
728        long walTs = AbstractFSWALProvider.getTimestamp(wal);
729        if (walTs < oldWalTs) {
730          return new ReplicationGroupOffset(wal, offset);
731        }
732        return oldOffset;
733      });
734    });
735    return new ReplicationQueueData(zkData.getQueueId(), ImmutableMap.copyOf(groupOffsets));
736  }
737
738  private void migrateQueues(ZKReplicationQueueStorageForMigration oldQueueStorage)
739    throws Exception {
740    MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> iter =
741      oldQueueStorage.listAllQueues();
742    for (;;) {
743      Pair<ServerName, List<ZkReplicationQueueData>> pair = iter.next();
744      if (pair == null) {
745        return;
746      }
747      queueStorage.batchUpdateQueues(pair.getFirst(),
748        pair.getSecond().stream().filter(data -> peers.containsKey(data.getQueueId().getPeerId()))
749          .map(this::convert).collect(Collectors.toList()));
750    }
751  }
752
753  private void migrateLastPushedSeqIds(ZKReplicationQueueStorageForMigration oldQueueStorage)
754    throws Exception {
755    MigrationIterator<List<ZkLastPushedSeqId>> iter = oldQueueStorage.listAllLastPushedSeqIds();
756    for (;;) {
757      List<ZkLastPushedSeqId> list = iter.next();
758      if (list == null) {
759        return;
760      }
761      queueStorage.batchUpdateLastSequenceIds(list.stream()
762        .filter(data -> peers.containsKey(data.getPeerId())).collect(Collectors.toList()));
763    }
764  }
765
766  private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStorage)
767    throws Exception {
768    MigrationIterator<Pair<String, List<String>>> iter = oldQueueStorage.listAllHFileRefs();
769    for (;;) {
770      Pair<String, List<String>> pair = iter.next();
771      if (pair == null) {
772        return;
773      }
774      if (peers.containsKey(pair.getFirst())) {
775        queueStorage.batchUpdateHFileRefs(pair.getFirst(), pair.getSecond());
776      }
777    }
778  }
779
780  private interface ExceptionalRunnable {
781    void run() throws Exception;
782  }
783
784  private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) {
785    CompletableFuture<?> future = new CompletableFuture<>();
786    executor.execute(() -> {
787      try {
788        task.run();
789        future.complete(null);
790      } catch (Exception e) {
791        future.completeExceptionally(e);
792      }
793    });
794    return future;
795  }
796
797  /**
798   * Submit the migration tasks to the given {@code executor}.
799   */
800  CompletableFuture<Void> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
801    // the replication queue table creation is asynchronous and will be triggered by addPeer, so
802    // here we need to manually initialize it since we will not call addPeer.
803    try {
804      initializeQueueStorage();
805    } catch (IOException e) {
806      return FutureUtils.failedFuture(e);
807    }
808    ZKReplicationQueueStorageForMigration oldStorage =
809      new ZKReplicationQueueStorageForMigration(zookeeper, conf);
810    return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor),
811      runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
812      runAsync(() -> migrateHFileRefs(oldStorage), executor));
813  }
814}