001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.regex.Pattern;
030import java.util.stream.Collectors;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
038import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
039import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
040import org.apache.hadoop.hbase.replication.ReplicationException;
041import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
042import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
043import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
044import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
045import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
046import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
047import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
048import org.apache.hadoop.hbase.replication.ReplicationUtils;
049import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
050import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
051import org.apache.hadoop.hbase.zookeeper.ZKConfig;
052import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.apache.zookeeper.KeeperException;
055
056/**
057 * Manages and performs all replication admin operations.
058 * <p>
059 * Used to add/remove a replication peer.
060 */
061@InterfaceAudience.Private
062public class ReplicationPeerManager {
063
064  private final ReplicationPeerStorage peerStorage;
065
066  private final ReplicationQueueStorage queueStorage;
067
068  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
069
070  private final String clusterId;
071
072  private final Configuration conf;
073
074  ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
075    ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
076    this.peerStorage = peerStorage;
077    this.queueStorage = queueStorage;
078    this.peers = peers;
079    this.conf = conf;
080    this.clusterId = clusterId;
081  }
082
083  private void checkQueuesDeleted(String peerId)
084      throws ReplicationException, DoNotRetryIOException {
085    for (ServerName replicator : queueStorage.getListOfReplicators()) {
086      List<String> queueIds = queueStorage.getAllQueues(replicator);
087      for (String queueId : queueIds) {
088        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
089        if (queueInfo.getPeerId().equals(peerId)) {
090          throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId +
091            ", replicator: " + replicator + ", queueId: " + queueId);
092        }
093      }
094    }
095    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
096      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
097    }
098  }
099
100  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
101      throws DoNotRetryIOException, ReplicationException {
102    if (peerId.contains("-")) {
103      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
104    }
105    checkPeerConfig(peerConfig);
106    if (peers.containsKey(peerId)) {
107      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
108    }
109    // make sure that there is no queues with the same peer id. This may happen when we create a
110    // peer with the same id with a old deleted peer. If the replication queues for the old peer
111    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
112    // file may also be replicated.
113    checkQueuesDeleted(peerId);
114  }
115
116  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
117    ReplicationPeerDescription desc = peers.get(peerId);
118    if (desc == null) {
119      throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist");
120    }
121    return desc;
122  }
123
124  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
125    return checkPeerExists(peerId).getPeerConfig();
126  }
127
128  void preEnablePeer(String peerId) throws DoNotRetryIOException {
129    ReplicationPeerDescription desc = checkPeerExists(peerId);
130    if (desc.isEnabled()) {
131      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
132    }
133  }
134
135  void preDisablePeer(String peerId) throws DoNotRetryIOException {
136    ReplicationPeerDescription desc = checkPeerExists(peerId);
137    if (!desc.isEnabled()) {
138      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
139    }
140  }
141
142  /**
143   * Return the old peer description. Can never be null.
144   */
145  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
146      throws DoNotRetryIOException {
147    checkPeerConfig(peerConfig);
148    ReplicationPeerDescription desc = checkPeerExists(peerId);
149    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
150    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
151      throw new DoNotRetryIOException(
152        "Changing the cluster key on an existing peer is not allowed. Existing key '" +
153          oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
154          peerConfig.getClusterKey() + "'");
155    }
156
157    if (!isStringEquals(peerConfig.getReplicationEndpointImpl(),
158      oldPeerConfig.getReplicationEndpointImpl())) {
159      throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
160        "on an existing peer is not allowed. Existing class '" +
161        oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
162        " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
163    }
164    return desc;
165  }
166
167  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
168      throws ReplicationException {
169    if (peers.containsKey(peerId)) {
170      // this should be a retry, just return
171      return;
172    }
173    peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
174    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
175    peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
176    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
177  }
178
179  public void removePeer(String peerId) throws ReplicationException {
180    if (!peers.containsKey(peerId)) {
181      // this should be a retry, just return
182      return;
183    }
184    peerStorage.removePeer(peerId);
185    peers.remove(peerId);
186  }
187
188  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
189    ReplicationPeerDescription desc = peers.get(peerId);
190    if (desc.isEnabled() == enabled) {
191      // this should be a retry, just return
192      return;
193    }
194    peerStorage.setPeerState(peerId, enabled);
195    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
196  }
197
198  public void enablePeer(String peerId) throws ReplicationException {
199    setPeerState(peerId, true);
200  }
201
202  public void disablePeer(String peerId) throws ReplicationException {
203    setPeerState(peerId, false);
204  }
205
206  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
207      throws ReplicationException {
208    // the checking rules are too complicated here so we give up checking whether this is a retry.
209    ReplicationPeerDescription desc = peers.get(peerId);
210    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
211    ReplicationPeerConfigBuilder newPeerConfigBuilder =
212        ReplicationPeerConfig.newBuilder(peerConfig);
213    // we need to use the new conf to overwrite the old one.
214    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
215    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
216    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
217    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
218    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
219    peerStorage.updatePeerConfig(peerId, newPeerConfig);
220    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
221  }
222
223  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
224    if (pattern == null) {
225      return new ArrayList<>(peers.values());
226    }
227    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
228        .collect(Collectors.toList());
229  }
230
231  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
232    ReplicationPeerDescription desc = peers.get(peerId);
233    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
234  }
235
236  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
237    queueStorage.removeLastSequenceIds(peerId);
238  }
239
240  void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
241    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
242    // on-going when the refresh peer config procedure is done, if a RS which has already been
243    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
244    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
245    // source, then the queue will leave there until the another RS detects the crash and helps
246    // removing the queue.
247    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
248    // claiming, it will either under the old RS or under the new RS, and a queue can only be
249    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
250    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
251    // unless it has already been removed by others.
252    ReplicationUtils.removeAllQueues(queueStorage, peerId);
253    ReplicationUtils.removeAllQueues(queueStorage, peerId);
254    queueStorage.removePeerFromHFileRefs(peerId);
255  }
256
257  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
258    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
259    ReplicationEndpoint endpoint = null;
260    if (!StringUtils.isBlank(replicationEndpointImpl)) {
261      try {
262        // try creating a instance
263        endpoint = Class.forName(replicationEndpointImpl)
264          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
265      } catch (Throwable e) {
266        throw new DoNotRetryIOException(
267          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
268          e);
269      }
270    }
271    // Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key
272    if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) {
273      checkClusterKey(peerConfig.getClusterKey());
274    }
275    // Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster
276    if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
277      checkClusterId(peerConfig.getClusterKey());
278    }
279
280    if (peerConfig.replicateAllUserTables()) {
281      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
282      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
283      // cluster.
284      if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
285          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
286        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
287            + "when you want replicate all cluster");
288      }
289      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
290        peerConfig.getExcludeTableCFsMap());
291    } else {
292      // If replicate_all flag is false, it means all user tables can't be replicated to peer
293      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
294      // cluster.
295      if ((peerConfig.getExcludeNamespaces() != null
296          && !peerConfig.getExcludeNamespaces().isEmpty())
297          || (peerConfig.getExcludeTableCFsMap() != null
298              && !peerConfig.getExcludeTableCFsMap().isEmpty())) {
299        throw new DoNotRetryIOException(
300            "Need clean exclude-namespaces or exclude-table-cfs config firstly"
301                + " when replicate_all flag is false");
302      }
303      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
304        peerConfig.getTableCFsMap());
305    }
306
307    checkConfiguredWALEntryFilters(peerConfig);
308  }
309
310  /**
311   * Set a namespace in the peer config means that all tables in this namespace will be replicated
312   * to the peer cluster.
313   * <ol>
314   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
315   * the peer config.</li>
316   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
317   * config.</li>
318   * </ol>
319   * <p>
320   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
321   * replicated to the peer cluster.
322   * <ol>
323   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
324   * this namespace to the peer config.</li>
325   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
326   * exclude namespace.</li>
327   * </ol>
328   */
329  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
330      Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
331    if (namespaces == null || namespaces.isEmpty()) {
332      return;
333    }
334    if (tableCfs == null || tableCfs.isEmpty()) {
335      return;
336    }
337    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
338      TableName table = entry.getKey();
339      if (namespaces.contains(table.getNamespaceAsString())) {
340        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " +
341          table.getNamespaceAsString() + " in peer config");
342      }
343    }
344  }
345
346  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
347      throws DoNotRetryIOException {
348    String filterCSV = peerConfig.getConfiguration()
349        .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
350    if (filterCSV != null && !filterCSV.isEmpty()) {
351      String[] filters = filterCSV.split(",");
352      for (String filter : filters) {
353        try {
354          Class.forName(filter).getDeclaredConstructor().newInstance();
355        } catch (Exception e) {
356          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
357            " could not be created. Failing add/update peer operation.", e);
358        }
359      }
360    }
361  }
362
363  private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
364    try {
365      ZKConfig.validateClusterKey(clusterKey);
366    } catch (IOException e) {
367      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
368    }
369  }
370
371  private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
372    String peerClusterId = "";
373    try {
374      // Create the peer cluster config for get peer cluster id
375      Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
376      try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
377        peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
378      }
379    } catch (IOException | KeeperException e) {
380      throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
381    }
382    // In rare case, zookeeper setting may be messed up. That leads to the incorrect
383    // peerClusterId value, which is the same as the source clusterId
384    if (clusterId.equals(peerClusterId)) {
385      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
386        + ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
387    }
388  }
389
390  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
391    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
392      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
393      .collect(Collectors.toList());
394  }
395
396  public ReplicationQueueStorage getQueueStorage() {
397    return queueStorage;
398  }
399
400  public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
401      throws ReplicationException {
402    ReplicationPeerStorage peerStorage =
403      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
404    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
405    for (String peerId : peerStorage.listPeerIds()) {
406      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
407
408      peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
409      peerStorage.updatePeerConfig(peerId, peerConfig);
410      boolean enabled = peerStorage.isPeerEnabled(peerId);
411      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
412    }
413    return new ReplicationPeerManager(peerStorage,
414      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
415  }
416
417  /**
418   * For replication peer cluster key or endpoint class, null and empty string is same. So here
419   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
420   */
421  private boolean isStringEquals(String s1, String s2) {
422    if (StringUtils.isBlank(s1)) {
423      return StringUtils.isBlank(s2);
424    }
425    return s1.equals(s2);
426  }
427}