001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.regex.Pattern;
030import java.util.stream.Collectors;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
039import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
040import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
041import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
042import org.apache.hadoop.hbase.replication.ReplicationException;
043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
044import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
045import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
046import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
047import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
050import org.apache.hadoop.hbase.replication.ReplicationUtils;
051import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
052import org.apache.hadoop.hbase.zookeeper.ZKConfig;
053import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
054import org.apache.yetus.audience.InterfaceAudience;
055import org.apache.zookeeper.KeeperException;
056
057/**
058 * Manages and performs all replication admin operations.
059 * <p>
060 * Used to add/remove a replication peer.
061 */
062@InterfaceAudience.Private
063public class ReplicationPeerManager {
064
065  private final ReplicationPeerStorage peerStorage;
066
067  private final ReplicationQueueStorage queueStorage;
068
069  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
070
071  private final String clusterId;
072
073  private final Configuration conf;
074
075  ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
076    ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
077    this.peerStorage = peerStorage;
078    this.queueStorage = queueStorage;
079    this.peers = peers;
080    this.conf = conf;
081    this.clusterId = clusterId;
082  }
083
084  private void checkQueuesDeleted(String peerId)
085    throws ReplicationException, DoNotRetryIOException {
086    for (ServerName replicator : queueStorage.getListOfReplicators()) {
087      List<String> queueIds = queueStorage.getAllQueues(replicator);
088      for (String queueId : queueIds) {
089        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
090        if (queueInfo.getPeerId().equals(peerId)) {
091          throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + ", replicator: "
092            + replicator + ", queueId: " + queueId);
093        }
094      }
095    }
096    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
097      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
098    }
099  }
100
101  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
102    throws DoNotRetryIOException, ReplicationException {
103    if (peerId.contains("-")) {
104      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
105    }
106    checkPeerConfig(peerConfig);
107    if (peers.containsKey(peerId)) {
108      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
109    }
110    // make sure that there is no queues with the same peer id. This may happen when we create a
111    // peer with the same id with a old deleted peer. If the replication queues for the old peer
112    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
113    // file may also be replicated.
114    checkQueuesDeleted(peerId);
115  }
116
117  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
118    ReplicationPeerDescription desc = peers.get(peerId);
119    if (desc == null) {
120      throw new ReplicationPeerNotFoundException(peerId);
121    }
122    return desc;
123  }
124
125  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
126    return checkPeerExists(peerId).getPeerConfig();
127  }
128
129  void preEnablePeer(String peerId) throws DoNotRetryIOException {
130    ReplicationPeerDescription desc = checkPeerExists(peerId);
131    if (desc.isEnabled()) {
132      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
133    }
134  }
135
136  void preDisablePeer(String peerId) throws DoNotRetryIOException {
137    ReplicationPeerDescription desc = checkPeerExists(peerId);
138    if (!desc.isEnabled()) {
139      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
140    }
141  }
142
143  /**
144   * Return the old peer description. Can never be null.
145   */
146  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
147    throws DoNotRetryIOException {
148    checkPeerConfig(peerConfig);
149    ReplicationPeerDescription desc = checkPeerExists(peerId);
150    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
151    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
152      throw new DoNotRetryIOException(
153        "Changing the cluster key on an existing peer is not allowed. Existing key '"
154          + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '"
155          + peerConfig.getClusterKey() + "'");
156    }
157
158    if (
159      !isStringEquals(peerConfig.getReplicationEndpointImpl(),
160        oldPeerConfig.getReplicationEndpointImpl())
161    ) {
162      throw new DoNotRetryIOException("Changing the replication endpoint implementation class "
163        + "on an existing peer is not allowed. Existing class '"
164        + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId
165        + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
166    }
167    return desc;
168  }
169
170  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
171    throws ReplicationException {
172    if (peers.containsKey(peerId)) {
173      // this should be a retry, just return
174      return;
175    }
176    peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
177    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
178    peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
179    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
180  }
181
182  public void removePeer(String peerId) throws ReplicationException {
183    if (!peers.containsKey(peerId)) {
184      // this should be a retry, just return
185      return;
186    }
187    peerStorage.removePeer(peerId);
188    peers.remove(peerId);
189  }
190
191  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
192    ReplicationPeerDescription desc = peers.get(peerId);
193    if (desc.isEnabled() == enabled) {
194      // this should be a retry, just return
195      return;
196    }
197    peerStorage.setPeerState(peerId, enabled);
198    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
199  }
200
201  public void enablePeer(String peerId) throws ReplicationException {
202    setPeerState(peerId, true);
203  }
204
205  public void disablePeer(String peerId) throws ReplicationException {
206    setPeerState(peerId, false);
207  }
208
209  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
210    throws ReplicationException {
211    // the checking rules are too complicated here so we give up checking whether this is a retry.
212    ReplicationPeerDescription desc = peers.get(peerId);
213    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
214    ReplicationPeerConfigBuilder newPeerConfigBuilder =
215      ReplicationPeerConfig.newBuilder(peerConfig);
216    // we need to use the new conf to overwrite the old one.
217    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
218    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
219    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
220    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
221    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
222    peerStorage.updatePeerConfig(peerId, newPeerConfig);
223    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
224  }
225
226  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
227    if (pattern == null) {
228      return new ArrayList<>(peers.values());
229    }
230    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
231      .collect(Collectors.toList());
232  }
233
234  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
235    ReplicationPeerDescription desc = peers.get(peerId);
236    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
237  }
238
239  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
240    queueStorage.removeLastSequenceIds(peerId);
241  }
242
243  void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
244    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
245    // on-going when the refresh peer config procedure is done, if a RS which has already been
246    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
247    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
248    // source, then the queue will leave there until the another RS detects the crash and helps
249    // removing the queue.
250    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
251    // claiming, it will either under the old RS or under the new RS, and a queue can only be
252    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
253    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
254    // unless it has already been removed by others.
255    ReplicationUtils.removeAllQueues(queueStorage, peerId);
256    ReplicationUtils.removeAllQueues(queueStorage, peerId);
257    queueStorage.removePeerFromHFileRefs(peerId);
258  }
259
260  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
261    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
262    ReplicationEndpoint endpoint = null;
263    if (!StringUtils.isBlank(replicationEndpointImpl)) {
264      try {
265        // try creating a instance
266        endpoint = Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class)
267          .getDeclaredConstructor().newInstance();
268      } catch (Throwable e) {
269        throw new DoNotRetryIOException(
270          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
271          e);
272      }
273    }
274    // Endpoints implementing HBaseReplicationEndpoint need to check cluster key
275    if (endpoint == null || endpoint instanceof HBaseReplicationEndpoint) {
276      checkClusterKey(peerConfig.getClusterKey());
277      // Check if endpoint can replicate to the same cluster
278      if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
279        checkSameClusterKey(peerConfig.getClusterKey());
280      }
281    }
282
283    if (peerConfig.replicateAllUserTables()) {
284      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
285      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
286      // cluster.
287      if (
288        (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
289          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())
290      ) {
291        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
292          + "when you want replicate all cluster");
293      }
294      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
295        peerConfig.getExcludeTableCFsMap());
296    } else {
297      // If replicate_all flag is false, it means all user tables can't be replicated to peer
298      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
299      // cluster.
300      if (
301        (peerConfig.getExcludeNamespaces() != null && !peerConfig.getExcludeNamespaces().isEmpty())
302          || (peerConfig.getExcludeTableCFsMap() != null
303            && !peerConfig.getExcludeTableCFsMap().isEmpty())
304      ) {
305        throw new DoNotRetryIOException(
306          "Need clean exclude-namespaces or exclude-table-cfs config firstly"
307            + " when replicate_all flag is false");
308      }
309      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
310        peerConfig.getTableCFsMap());
311    }
312
313    checkConfiguredWALEntryFilters(peerConfig);
314  }
315
316  /**
317   * Set a namespace in the peer config means that all tables in this namespace will be replicated
318   * to the peer cluster.
319   * <ol>
320   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
321   * the peer config.</li>
322   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
323   * config.</li>
324   * </ol>
325   * <p>
326   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
327   * replicated to the peer cluster.
328   * <ol>
329   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
330   * this namespace to the peer config.</li>
331   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
332   * exclude namespace.</li>
333   * </ol>
334   */
335  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
336    Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
337    if (namespaces == null || namespaces.isEmpty()) {
338      return;
339    }
340    if (tableCfs == null || tableCfs.isEmpty()) {
341      return;
342    }
343    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
344      TableName table = entry.getKey();
345      if (namespaces.contains(table.getNamespaceAsString())) {
346        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces "
347          + table.getNamespaceAsString() + " in peer config");
348      }
349    }
350  }
351
352  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
353    throws DoNotRetryIOException {
354    String filterCSV = peerConfig.getConfiguration()
355      .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
356    if (filterCSV != null && !filterCSV.isEmpty()) {
357      String[] filters = filterCSV.split(",");
358      for (String filter : filters) {
359        try {
360          Class.forName(filter).getDeclaredConstructor().newInstance();
361        } catch (Exception e) {
362          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter
363            + " could not be created. Failing add/update peer operation.", e);
364        }
365      }
366    }
367  }
368
369  private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
370    try {
371      ZKConfig.validateClusterKey(clusterKey);
372    } catch (IOException e) {
373      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
374    }
375  }
376
377  private void checkSameClusterKey(String clusterKey) throws DoNotRetryIOException {
378    String peerClusterId = "";
379    try {
380      // Create the peer cluster config for get peer cluster id
381      Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
382      try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
383        peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
384      }
385    } catch (IOException | KeeperException e) {
386      throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
387    }
388    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
389    // peerClusterId value, which is the same as the source clusterId
390    if (clusterId.equals(peerClusterId)) {
391      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
392        + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
393    }
394  }
395
396  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
397    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
398      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
399      .collect(Collectors.toList());
400  }
401
402  public ReplicationQueueStorage getQueueStorage() {
403    return queueStorage;
404  }
405
406  public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
407    throws ReplicationException {
408    ReplicationPeerStorage peerStorage =
409      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
410    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
411    for (String peerId : peerStorage.listPeerIds()) {
412      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
413
414      peerConfig = ReplicationPeerConfigUtil.updateReplicationBasePeerConfigs(conf, peerConfig);
415      peerStorage.updatePeerConfig(peerId, peerConfig);
416      boolean enabled = peerStorage.isPeerEnabled(peerId);
417      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
418    }
419    return new ReplicationPeerManager(peerStorage,
420      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
421  }
422
423  /**
424   * For replication peer cluster key or endpoint class, null and empty string is same. So here
425   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
426   */
427  private boolean isStringEquals(String s1, String s2) {
428    if (StringUtils.isBlank(s1)) {
429      return StringUtils.isBlank(s2);
430    }
431    return s1.equals(s2);
432  }
433}