001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.regex.Pattern;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
034import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
035import org.apache.hadoop.hbase.replication.ReplicationException;
036import org.apache.hadoop.hbase.replication.ReplicationFactory;
037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
038import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
039import org.apache.hadoop.hbase.replication.ReplicationPeers;
040import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
041import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
042import org.apache.yetus.audience.InterfaceAudience;
043
044/**
045 * Manages and performs all replication admin operations.
046 * Used to add/remove a replication peer.
047 */
048@InterfaceAudience.Private
049public class ReplicationManager {
050
051  private final Configuration conf;
052  private final ZKWatcher zkw;
053  private final ReplicationQueuesClient replicationQueuesClient;
054  private final ReplicationPeers replicationPeers;
055
056  public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable)
057      throws IOException {
058    this.conf = conf;
059    this.zkw = zkw;
060    try {
061      this.replicationQueuesClient = ReplicationFactory
062          .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
063      this.replicationQueuesClient.init();
064      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
065        this.replicationQueuesClient, abortable);
066      this.replicationPeers.init();
067    } catch (Exception e) {
068      throw new IOException("Failed to construct ReplicationManager", e);
069    }
070  }
071
072  public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
073      throws ReplicationException, IOException {
074    checkPeerConfig(peerConfig);
075    replicationPeers.registerPeer(peerId, peerConfig, enabled);
076    replicationPeers.peerConnected(peerId);
077  }
078
079  public void removeReplicationPeer(String peerId) throws ReplicationException {
080    replicationPeers.peerDisconnected(peerId);
081    replicationPeers.unregisterPeer(peerId);
082  }
083
084  public void enableReplicationPeer(String peerId) throws ReplicationException {
085    this.replicationPeers.enablePeer(peerId);
086  }
087
088  public void disableReplicationPeer(String peerId) throws ReplicationException {
089    this.replicationPeers.disablePeer(peerId);
090  }
091
092  public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException,
093      ReplicationPeerNotFoundException {
094    ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId);
095    if (peerConfig == null) {
096      throw new ReplicationPeerNotFoundException(peerId);
097    }
098    return peerConfig;
099  }
100
101  public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
102      throws ReplicationException, IOException {
103    checkPeerConfig(peerConfig);
104    this.replicationPeers.updatePeerConfig(peerId, peerConfig);
105  }
106
107  public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
108      throws ReplicationException {
109    List<ReplicationPeerDescription> peers = new ArrayList<>();
110    List<String> peerIds = replicationPeers.getAllPeerIds();
111    for (String peerId : peerIds) {
112      if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) {
113        peers.add(new ReplicationPeerDescription(peerId, replicationPeers
114            .getStatusOfPeerFromBackingStore(peerId), replicationPeers
115            .getReplicationPeerConfig(peerId)));
116      }
117    }
118    return peers;
119  }
120
121  /**
122   * If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
123   * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to
124   * peer cluster.
125   *
126   * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
127   * Then allow to config namespaces or table-cfs which will be replicated to peer cluster.
128   */
129  private void checkPeerConfig(ReplicationPeerConfig peerConfig)
130      throws ReplicationException, IOException {
131    if (peerConfig.replicateAllUserTables()) {
132      if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
133          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
134        throw new ReplicationException("Need clean namespaces or table-cfs config firstly"
135            + " when replicate_all flag is true");
136      }
137      checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(),
138        peerConfig.getExcludeTableCFsMap());
139    } else {
140      if ((peerConfig.getExcludeNamespaces() != null
141          && !peerConfig.getExcludeNamespaces().isEmpty())
142          || (peerConfig.getExcludeTableCFsMap() != null
143              && !peerConfig.getExcludeTableCFsMap().isEmpty())) {
144        throw new ReplicationException(
145            "Need clean exclude-namespaces or exclude-table-cfs config firstly"
146                + " when replicate_all flag is false");
147      }
148      checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
149        peerConfig.getTableCFsMap());
150    }
151    checkConfiguredWALEntryFilters(peerConfig);
152  }
153
154  /**
155   * Set a namespace in the peer config means that all tables in this namespace will be replicated
156   * to the peer cluster.
157   * 1. If peer config already has a namespace, then not allow set any table of this namespace
158   *    to the peer config.
159   * 2. If peer config already has a table, then not allow set this table's namespace to the peer
160   *    config.
161   *
162   * Set a exclude namespace in the peer config means that all tables in this namespace can't be
163   * replicated to the peer cluster.
164   * 1. If peer config already has a exclude namespace, then not allow set any exclude table of
165   *    this namespace to the peer config.
166   * 2. If peer config already has a exclude table, then not allow set this table's namespace
167   *    as a exclude namespace.
168   */
169  private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
170      Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
171    if (namespaces == null || namespaces.isEmpty()) {
172      return;
173    }
174    if (tableCfs == null || tableCfs.isEmpty()) {
175      return;
176    }
177    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
178      TableName table = entry.getKey();
179      if (namespaces.contains(table.getNamespaceAsString())) {
180        throw new ReplicationException("Table-cfs " + table + " is conflict with namespaces "
181            + table.getNamespaceAsString() + " in peer config");
182      }
183    }
184  }
185
186  private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
187      throws IOException {
188    String filterCSV = peerConfig.getConfiguration().
189        get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
190    if (filterCSV != null && !filterCSV.isEmpty()){
191      String [] filters = filterCSV.split(",");
192      for (String filter : filters) {
193        try {
194          Class clazz = Class.forName(filter);
195          Object o = clazz.getDeclaredConstructor().newInstance();
196        } catch (Exception e) {
197          throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
198              " could not be created. Failing add/update " + "peer operation.", e);
199        }
200      }
201    }
202  }
203}