001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.Collection;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.TreeMap;
027import org.apache.commons.lang3.StringUtils;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.yetus.audience.InterfaceAudience;
031
032import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
033
034/**
035 * A configuration for the replication peer cluster.
036 */
037@InterfaceAudience.Public
038public class ReplicationPeerConfig {
039
040  private String clusterKey;
041  private String replicationEndpointImpl;
042  private final Map<byte[], byte[]> peerData;
043  private final Map<String, String> configuration;
044  private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
045  private Set<String> namespaces = null;
046  // Default value is true, means replicate all user tables to peer cluster.
047  private boolean replicateAllUserTables = true;
048  private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
049  private Set<String> excludeNamespaces = null;
050  private long bandwidth = 0;
051  private final boolean serial;
052  // Used by synchronous replication
053  private String remoteWALDir;
054
055  private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
056    this.clusterKey = builder.clusterKey;
057    this.replicationEndpointImpl = builder.replicationEndpointImpl;
058    this.peerData = Collections.unmodifiableMap(builder.peerData);
059    this.configuration = Collections.unmodifiableMap(builder.configuration);
060    this.tableCFsMap =
061      builder.tableCFsMap != null ? unmodifiableTableCFsMap(builder.tableCFsMap) : null;
062    this.namespaces =
063      builder.namespaces != null ? Collections.unmodifiableSet(builder.namespaces) : null;
064    this.replicateAllUserTables = builder.replicateAllUserTables;
065    this.excludeTableCFsMap = builder.excludeTableCFsMap != null
066      ? unmodifiableTableCFsMap(builder.excludeTableCFsMap)
067      : null;
068    this.excludeNamespaces = builder.excludeNamespaces != null
069      ? Collections.unmodifiableSet(builder.excludeNamespaces)
070      : null;
071    this.bandwidth = builder.bandwidth;
072    this.serial = builder.serial;
073    this.remoteWALDir = builder.remoteWALDir;
074  }
075
076  private Map<TableName, List<String>>
077    unmodifiableTableCFsMap(Map<TableName, List<String>> tableCFsMap) {
078    Map<TableName, List<String>> newTableCFsMap = new HashMap<>();
079    tableCFsMap.forEach((table, cfs) -> newTableCFsMap.put(table,
080      cfs != null ? Collections.unmodifiableList(cfs) : null));
081    return Collections.unmodifiableMap(newTableCFsMap);
082  }
083
084  public String getClusterKey() {
085    return clusterKey;
086  }
087
088  public String getReplicationEndpointImpl() {
089    return replicationEndpointImpl;
090  }
091
092  public Map<byte[], byte[]> getPeerData() {
093    return peerData;
094  }
095
096  public Map<String, String> getConfiguration() {
097    return configuration;
098  }
099
100  public Map<TableName, List<String>> getTableCFsMap() {
101    return (Map<TableName, List<String>>) tableCFsMap;
102  }
103
104  public Set<String> getNamespaces() {
105    return this.namespaces;
106  }
107
108  public long getBandwidth() {
109    return this.bandwidth;
110  }
111
112  public boolean replicateAllUserTables() {
113    return this.replicateAllUserTables;
114  }
115
116  public Map<TableName, List<String>> getExcludeTableCFsMap() {
117    return (Map<TableName, List<String>>) excludeTableCFsMap;
118  }
119
120  public Set<String> getExcludeNamespaces() {
121    return this.excludeNamespaces;
122  }
123
124  public String getRemoteWALDir() {
125    return this.remoteWALDir;
126  }
127
128  /**
129   * Use remote wal dir to decide whether a peer is sync replication peer
130   */
131  public boolean isSyncReplication() {
132    return !StringUtils.isBlank(this.remoteWALDir);
133  }
134
135  public static ReplicationPeerConfigBuilder newBuilder() {
136    return new ReplicationPeerConfigBuilderImpl();
137  }
138
139  public boolean isSerial() {
140    return serial;
141  }
142
143  public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
144    ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
145    builder.setClusterKey(peerConfig.getClusterKey())
146      .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
147      .putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
148      .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
149      .setReplicateAllUserTables(peerConfig.replicateAllUserTables())
150      .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
151      .setExcludeNamespaces(peerConfig.getExcludeNamespaces())
152      .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial())
153      .setRemoteWALDir(peerConfig.getRemoteWALDir());
154    return builder;
155  }
156
157  static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBuilder {
158
159    private String clusterKey;
160
161    private String replicationEndpointImpl;
162
163    private Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
164
165    private Map<String, String> configuration = new HashMap<>();
166
167    private Map<TableName, List<String>> tableCFsMap = null;
168
169    private Set<String> namespaces = null;
170
171    // Default value is true, means replicate all user tables to peer cluster.
172    private boolean replicateAllUserTables = true;
173
174    private Map<TableName, List<String>> excludeTableCFsMap = null;
175
176    private Set<String> excludeNamespaces = null;
177
178    private long bandwidth = 0;
179
180    private boolean serial = false;
181
182    private String remoteWALDir = null;
183
184    @Override
185    public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
186      this.clusterKey = clusterKey != null ? clusterKey.trim() : null;
187      return this;
188    }
189
190    @Override
191    public ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl) {
192      this.replicationEndpointImpl = replicationEndpointImpl;
193      return this;
194    }
195
196    @Override
197    public ReplicationPeerConfigBuilder putConfiguration(String key, String value) {
198      this.configuration.put(key, value);
199      return this;
200    }
201
202    @Override
203    public ReplicationPeerConfigBuilder removeConfiguration(String key) {
204      this.configuration.remove(key);
205      return this;
206    }
207
208    @Override
209    public ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value) {
210      this.peerData.put(key, value);
211      return this;
212    }
213
214    @Override
215    public ReplicationPeerConfigBuilder setTableCFsMap(Map<TableName, List<String>> tableCFsMap) {
216      this.tableCFsMap = tableCFsMap;
217      return this;
218    }
219
220    @Override
221    public ReplicationPeerConfigBuilder setNamespaces(Set<String> namespaces) {
222      this.namespaces = namespaces;
223      return this;
224    }
225
226    @Override
227    public ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables) {
228      this.replicateAllUserTables = replicateAllUserTables;
229      return this;
230    }
231
232    @Override
233    public ReplicationPeerConfigBuilder
234      setExcludeTableCFsMap(Map<TableName, List<String>> excludeTableCFsMap) {
235      this.excludeTableCFsMap = excludeTableCFsMap;
236      return this;
237    }
238
239    @Override
240    public ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> excludeNamespaces) {
241      this.excludeNamespaces = excludeNamespaces;
242      return this;
243    }
244
245    @Override
246    public ReplicationPeerConfigBuilder setBandwidth(long bandwidth) {
247      this.bandwidth = bandwidth;
248      return this;
249    }
250
251    @Override
252    public ReplicationPeerConfigBuilder setSerial(boolean serial) {
253      this.serial = serial;
254      return this;
255    }
256
257    @Override
258    public ReplicationPeerConfigBuilder setRemoteWALDir(String dir) {
259      this.remoteWALDir = dir;
260      return this;
261    }
262
263    @Override
264    public ReplicationPeerConfig build() {
265      // It would be nice to validate the configuration, but we have to work with "old" data
266      // from ZK which makes it much more difficult.
267      return new ReplicationPeerConfig(this);
268    }
269  }
270
271  @Override
272  public String toString() {
273    StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
274    builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
275    builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
276    if (replicateAllUserTables) {
277      if (excludeNamespaces != null) {
278        builder.append("excludeNamespaces=").append(excludeNamespaces.toString()).append(",");
279      }
280      if (excludeTableCFsMap != null) {
281        builder.append("excludeTableCFsMap=").append(excludeTableCFsMap.toString()).append(",");
282      }
283    } else {
284      if (namespaces != null) {
285        builder.append("namespaces=").append(namespaces.toString()).append(",");
286      }
287      if (tableCFsMap != null) {
288        builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
289      }
290    }
291    builder.append("bandwidth=").append(bandwidth).append(",");
292    builder.append("serial=").append(serial);
293    if (this.remoteWALDir != null) {
294      builder.append(",remoteWALDir=").append(remoteWALDir);
295    }
296    return builder.toString();
297  }
298
299  /**
300   * Decide whether the table need replicate to the peer cluster
301   * @param table name of the table
302   * @return true if the table need replicate to the peer cluster
303   */
304  public boolean needToReplicate(TableName table) {
305    return needToReplicate(table, null);
306  }
307
308  /**
309   * Decide whether the passed family of the table need replicate to the peer cluster according to
310   * this peer config.
311   * @param table  name of the table
312   * @param family family name
313   * @return true if (the family of) the table need replicate to the peer cluster. If passed family
314   *         is null, return true if any CFs of the table need replicate; If passed family is not
315   *         null, return true if the passed family need replicate.
316   */
317  public boolean needToReplicate(TableName table, byte[] family) {
318    String namespace = table.getNamespaceAsString();
319    if (replicateAllUserTables) {
320      // replicate all user tables, but filter by exclude namespaces and table-cfs config
321      if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
322        return false;
323      }
324      // trap here, must check existence first since HashMap allows null value.
325      if (excludeTableCFsMap == null || !excludeTableCFsMap.containsKey(table)) {
326        return true;
327      }
328      Collection<String> cfs = excludeTableCFsMap.get(table);
329      // If cfs is null or empty then we can make sure that we do not need to replicate this table,
330      // otherwise, we may still need to replicate the table but filter out some families.
331      return cfs != null && !cfs.isEmpty()
332      // If exclude-table-cfs contains passed family then we make sure that we do not need to
333      // replicate this family.
334        && (family == null || !cfs.contains(Bytes.toString(family)));
335    } else {
336      // Not replicate all user tables, so filter by namespaces and table-cfs config
337      if (namespaces == null && tableCFsMap == null) {
338        return false;
339      }
340      // First filter by namespaces config
341      // If table's namespace in peer config, all the tables data are applicable for replication
342      if (namespaces != null && namespaces.contains(namespace)) {
343        return true;
344      }
345      // If table-cfs contains this table then we can make sure that we need replicate some CFs of
346      // this table. Further we need all CFs if tableCFsMap.get(table) is null or empty.
347      return tableCFsMap != null && tableCFsMap.containsKey(table)
348        && (family == null || CollectionUtils.isEmpty(tableCFsMap.get(table))
349        // If table-cfs must contain passed family then we need to replicate this family.
350          || tableCFsMap.get(table).contains(Bytes.toString(family)));
351    }
352  }
353}