001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.regex.Pattern;
030import java.util.stream.Collectors;
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.DoNotRetryIOException;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
037import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
038import org.apache.hadoop.hbase.replication.ReplicationException;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
040import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
041import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
042import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
043import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
044import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
045import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
046import org.apache.hadoop.hbase.replication.ReplicationUtils;
047import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
048import org.apache.hadoop.hbase.zookeeper.ZKConfig;
049import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
050import org.apache.yetus.audience.InterfaceAudience;
051
052/**
053 * Manages and performs all replication admin operations.
054 * <p>
055 * Used to add/remove a replication peer.
056 */
057@InterfaceAudience.Private
058public class ReplicationPeerManager {
059
060  private final ReplicationPeerStorage peerStorage;
061
062  private final ReplicationQueueStorage queueStorage;
063
064  private final ConcurrentMap<String, ReplicationPeerDescription> peers;
065
066  ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
067      ConcurrentMap<String, ReplicationPeerDescription> peers) {
068    this.peerStorage = peerStorage;
069    this.queueStorage = queueStorage;
070    this.peers = peers;
071  }
072
073  private void checkQueuesDeleted(String peerId)
074      throws ReplicationException, DoNotRetryIOException {
075    for (ServerName replicator : queueStorage.getListOfReplicators()) {
076      List<String> queueIds = queueStorage.getAllQueues(replicator);
077      for (String queueId : queueIds) {
078        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
079        if (queueInfo.getPeerId().equals(peerId)) {
080          throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId +
081            ", replicator: " + replicator + ", queueId: " + queueId);
082        }
083      }
084    }
085    if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
086      throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
087    }
088  }
089
090  void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
091      throws DoNotRetryIOException, ReplicationException {
092    if (peerId.contains("-")) {
093      throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
094    }
095    checkPeerConfig(peerConfig);
096    if (peers.containsKey(peerId)) {
097      throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
098    }
099    // make sure that there is no queues with the same peer id. This may happen when we create a
100    // peer with the same id with a old deleted peer. If the replication queues for the old peer
101    // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
102    // file may also be replicated.
103    checkQueuesDeleted(peerId);
104  }
105
106  private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException {
107    ReplicationPeerDescription desc = peers.get(peerId);
108    if (desc == null) {
109      throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist");
110    }
111    return desc;
112  }
113
114  ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException {
115    return checkPeerExists(peerId).getPeerConfig();
116  }
117
118  void preEnablePeer(String peerId) throws DoNotRetryIOException {
119    ReplicationPeerDescription desc = checkPeerExists(peerId);
120    if (desc.isEnabled()) {
121      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled");
122    }
123  }
124
125  void preDisablePeer(String peerId) throws DoNotRetryIOException {
126    ReplicationPeerDescription desc = checkPeerExists(peerId);
127    if (!desc.isEnabled()) {
128      throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled");
129    }
130  }
131
132  /**
133   * Return the old peer description. Can never be null.
134   */
135  ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
136      throws DoNotRetryIOException {
137    checkPeerConfig(peerConfig);
138    ReplicationPeerDescription desc = checkPeerExists(peerId);
139    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
140    if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) {
141      throw new DoNotRetryIOException(
142        "Changing the cluster key on an existing peer is not allowed. Existing key '" +
143          oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" +
144          peerConfig.getClusterKey() + "'");
145    }
146
147    if (!isStringEquals(peerConfig.getReplicationEndpointImpl(),
148      oldPeerConfig.getReplicationEndpointImpl())) {
149      throw new DoNotRetryIOException("Changing the replication endpoint implementation class " +
150        "on an existing peer is not allowed. Existing class '" +
151        oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId +
152        " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'");
153    }
154    return desc;
155  }
156
157  public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
158      throws ReplicationException {
159    if (peers.containsKey(peerId)) {
160      // this should be a retry, just return
161      return;
162    }
163    ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
164    peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
165    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
166  }
167
168  public void removePeer(String peerId) throws ReplicationException {
169    if (!peers.containsKey(peerId)) {
170      // this should be a retry, just return
171      return;
172    }
173    peerStorage.removePeer(peerId);
174    peers.remove(peerId);
175  }
176
177  private void setPeerState(String peerId, boolean enabled) throws ReplicationException {
178    ReplicationPeerDescription desc = peers.get(peerId);
179    if (desc.isEnabled() == enabled) {
180      // this should be a retry, just return
181      return;
182    }
183    peerStorage.setPeerState(peerId, enabled);
184    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig()));
185  }
186
187  public void enablePeer(String peerId) throws ReplicationException {
188    setPeerState(peerId, true);
189  }
190
191  public void disablePeer(String peerId) throws ReplicationException {
192    setPeerState(peerId, false);
193  }
194
195  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
196      throws ReplicationException {
197    // the checking rules are too complicated here so we give up checking whether this is a retry.
198    ReplicationPeerDescription desc = peers.get(peerId);
199    ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
200    ReplicationPeerConfigBuilder newPeerConfigBuilder =
201        ReplicationPeerConfig.newBuilder(peerConfig);
202    // we need to use the new conf to overwrite the old one.
203    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
204    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
205    newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
206    newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
207    ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
208    peerStorage.updatePeerConfig(peerId, newPeerConfig);
209    peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
210  }
211
212  public List<ReplicationPeerDescription> listPeers(Pattern pattern) {
213    if (pattern == null) {
214      return new ArrayList<>(peers.values());
215    }
216    return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches())
217        .collect(Collectors.toList());
218  }
219
220  public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) {
221    ReplicationPeerDescription desc = peers.get(peerId);
222    return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
223  }
224
225  void removeAllLastPushedSeqIds(String peerId) throws ReplicationException {
226    queueStorage.removeLastSequenceIds(peerId);
227  }
228
229  void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
230    // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
231    // on-going when the refresh peer config procedure is done, if a RS which has already been
232    // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
233    // the scan here, and if the RS who has claimed the queue crashed before creating recovered
234    // source, then the queue will leave there until the another RS detects the crash and helps
235    // removing the queue.
236    // A two pass scan can solve the problem. Anyway, the queue will not disappear during the
237    // claiming, it will either under the old RS or under the new RS, and a queue can only be
238    // claimed once after the refresh peer procedure done(as the next claim queue will just delete
239    // it), so we can make sure that a two pass scan will finally find the queue and remove it,
240    // unless it has already been removed by others.
241    ReplicationUtils.removeAllQueues(queueStorage, peerId);
242    ReplicationUtils.removeAllQueues(queueStorage, peerId);
243    queueStorage.removePeerFromHFileRefs(peerId);
244  }
245
246  private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
247    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
248    boolean checkClusterKey = true;
249    if (!StringUtils.isBlank(replicationEndpointImpl)) {
250      // try creating a instance
251      ReplicationEndpoint endpoint;
252      try {
253        endpoint = Class.forName(replicationEndpointImpl)
254          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
255      } catch (Throwable e) {
256        throw new DoNotRetryIOException(
257          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
258          e);
259      }
260      // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
261      if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
262        checkClusterKey = false;
263      }
264    }
265    if (checkClusterKey) {
266      checkClusterKey(peerConfig.getClusterKey());
267    }
268
269    if (peerConfig.replicateAllUserTables()) {
270      // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
271      // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer
272      // cluster.
273      if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
274          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
275        throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly "
276            + "when you want replicate all cluster");
277      }
278      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
279        peerConfig.getExcludeTableCFsMap());
280    } else {
281      // If replicate_all flag is false, it means all user tables can't be replicated to peer
282      // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer
283      // cluster.
284      if ((peerConfig.getExcludeNamespaces() != null
285          && !peerConfig.getExcludeNamespaces().isEmpty())
286          || (peerConfig.getExcludeTableCFsMap() != null
287              && !peerConfig.getExcludeTableCFsMap().isEmpty())) {
288        throw new DoNotRetryIOException(
289            "Need clean exclude-namespaces or exclude-table-cfs config firstly"
290                + " when replicate_all flag is false");
291      }
292      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
293        peerConfig.getTableCFsMap());
294    }
295
296    checkConfiguredWALEntryFilters(peerConfig);
297  }
298
299  /**
300   * Set a namespace in the peer config means that all tables in this namespace will be replicated
301   * to the peer cluster.
302   * <ol>
303   * <li>If peer config already has a namespace, then not allow set any table of this namespace to
304   * the peer config.</li>
305   * <li>If peer config already has a table, then not allow set this table's namespace to the peer
306   * config.</li>
307   * </ol>
308   * <p>
309   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
310   * replicated to the peer cluster.
311   * <ol>
312   * <li>If peer config already has a exclude namespace, then not allow set any exclude table of
313   * this namespace to the peer config.</li>
314   * <li>If peer config already has a exclude table, then not allow set this table's namespace as a
315   * exclude namespace.</li>
316   * </ol>
317   */
318  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
319      Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException {
320    if (namespaces == null || namespaces.isEmpty()) {
321      return;
322    }
323    if (tableCfs == null || tableCfs.isEmpty()) {
324      return;
325    }
326    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
327      TableName table = entry.getKey();
328      if (namespaces.contains(table.getNamespaceAsString())) {
329        throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " +
330          table.getNamespaceAsString() + " in peer config");
331      }
332    }
333  }
334
335  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
336      throws DoNotRetryIOException {
337    String filterCSV = peerConfig.getConfiguration()
338        .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
339    if (filterCSV != null && !filterCSV.isEmpty()) {
340      String[] filters = filterCSV.split(",");
341      for (String filter : filters) {
342        try {
343          Class.forName(filter).getDeclaredConstructor().newInstance();
344        } catch (Exception e) {
345          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
346            " could not be created. Failing add/update peer operation.", e);
347        }
348      }
349    }
350  }
351
352  private void checkClusterKey(String clusterKey) throws DoNotRetryIOException {
353    try {
354      ZKConfig.validateClusterKey(clusterKey);
355    } catch (IOException e) {
356      throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e);
357    }
358  }
359
360  public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
361    return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
362      .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
363      .collect(Collectors.toList());
364  }
365
366  public ReplicationQueueStorage getQueueStorage() {
367    return queueStorage;
368  }
369
370  public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
371      throws ReplicationException {
372    ReplicationPeerStorage peerStorage =
373      ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
374    ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
375    for (String peerId : peerStorage.listPeerIds()) {
376      ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
377      boolean enabled = peerStorage.isPeerEnabled(peerId);
378      peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
379    }
380    return new ReplicationPeerManager(peerStorage,
381      ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
382  }
383
384  /**
385   * For replication peer cluster key or endpoint class, null and empty string is same. So here
386   * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly.
387   */
388  private boolean isStringEquals(String s1, String s2) {
389    if (StringUtils.isBlank(s1)) {
390      return StringUtils.isBlank(s2);
391    }
392    return s1.equals(s2);
393  }
394}