001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import java.util.Collection;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.TreeMap;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.util.Bytes;
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
032
033/**
034 * A configuration for the replication peer cluster.
035 */
036@InterfaceAudience.Public
037public class ReplicationPeerConfig {
038
039  private String clusterKey;
040  private String replicationEndpointImpl;
041  private final Map<byte[], byte[]> peerData;
042  private final Map<String, String> configuration;
043  private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
044  private Set<String> namespaces = null;
045  // Default value is true, means replicate all user tables to peer cluster.
046  private boolean replicateAllUserTables = true;
047  private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
048  private Set<String> excludeNamespaces = null;
049  private long bandwidth = 0;
050  private final boolean serial;
051
052  private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
053    this.clusterKey = builder.clusterKey;
054    this.replicationEndpointImpl = builder.replicationEndpointImpl;
055    this.peerData = Collections.unmodifiableMap(builder.peerData);
056    this.configuration = Collections.unmodifiableMap(builder.configuration);
057    this.tableCFsMap =
058      builder.tableCFsMap != null ? unmodifiableTableCFsMap(builder.tableCFsMap) : null;
059    this.namespaces =
060      builder.namespaces != null ? Collections.unmodifiableSet(builder.namespaces) : null;
061    this.replicateAllUserTables = builder.replicateAllUserTables;
062    this.excludeTableCFsMap = builder.excludeTableCFsMap != null
063      ? unmodifiableTableCFsMap(builder.excludeTableCFsMap)
064      : null;
065    this.excludeNamespaces = builder.excludeNamespaces != null
066      ? Collections.unmodifiableSet(builder.excludeNamespaces)
067      : null;
068    this.bandwidth = builder.bandwidth;
069    this.serial = builder.serial;
070  }
071
072  private Map<TableName, List<String>>
073    unmodifiableTableCFsMap(Map<TableName, List<String>> tableCFsMap) {
074    Map<TableName, List<String>> newTableCFsMap = new HashMap<>();
075    tableCFsMap.forEach((table, cfs) -> newTableCFsMap.put(table,
076      cfs != null ? Collections.unmodifiableList(cfs) : null));
077    return Collections.unmodifiableMap(newTableCFsMap);
078  }
079
080  /**
081   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
082   *             {@link ReplicationPeerConfigBuilder} to create new ReplicationPeerConfig.
083   */
084  @Deprecated
085  public ReplicationPeerConfig() {
086    this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
087    this.configuration = new HashMap<>(0);
088    this.serial = false;
089  }
090
091  /**
092   * Set the clusterKey which is the concatenation of the slave cluster's:
093   * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
094   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
095   *             {@link ReplicationPeerConfigBuilder#setClusterKey(String)} instead.
096   */
097  @Deprecated
098  public ReplicationPeerConfig setClusterKey(String clusterKey) {
099    this.clusterKey = clusterKey;
100    return this;
101  }
102
103  /**
104   * Sets the ReplicationEndpoint plugin class for this peer.
105   * @param replicationEndpointImpl a class implementing ReplicationEndpoint
106   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
107   *             {@link ReplicationPeerConfigBuilder#setReplicationEndpointImpl(String)} instead.
108   */
109  @Deprecated
110  public ReplicationPeerConfig setReplicationEndpointImpl(String replicationEndpointImpl) {
111    this.replicationEndpointImpl = replicationEndpointImpl;
112    return this;
113  }
114
115  public String getClusterKey() {
116    return clusterKey;
117  }
118
119  public String getReplicationEndpointImpl() {
120    return replicationEndpointImpl;
121  }
122
123  public Map<byte[], byte[]> getPeerData() {
124    return peerData;
125  }
126
127  public Map<String, String> getConfiguration() {
128    return configuration;
129  }
130
131  public Map<TableName, List<String>> getTableCFsMap() {
132    return (Map<TableName, List<String>>) tableCFsMap;
133  }
134
135  /**
136   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
137   *             {@link ReplicationPeerConfigBuilder#setTableCFsMap(Map)} instead.
138   */
139  @Deprecated
140  public ReplicationPeerConfig
141    setTableCFsMap(Map<TableName, ? extends Collection<String>> tableCFsMap) {
142    this.tableCFsMap = tableCFsMap;
143    return this;
144  }
145
146  public Set<String> getNamespaces() {
147    return this.namespaces;
148  }
149
150  /**
151   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
152   *             {@link ReplicationPeerConfigBuilder#setNamespaces(Set)} instead.
153   */
154  @Deprecated
155  public ReplicationPeerConfig setNamespaces(Set<String> namespaces) {
156    this.namespaces = namespaces;
157    return this;
158  }
159
160  public long getBandwidth() {
161    return this.bandwidth;
162  }
163
164  /**
165   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
166   *             {@link ReplicationPeerConfigBuilder#setBandwidth(long)} instead.
167   */
168  @Deprecated
169  public ReplicationPeerConfig setBandwidth(long bandwidth) {
170    this.bandwidth = bandwidth;
171    return this;
172  }
173
174  public boolean replicateAllUserTables() {
175    return this.replicateAllUserTables;
176  }
177
178  /**
179   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
180   *             {@link ReplicationPeerConfigBuilder#setReplicateAllUserTables(boolean)} instead.
181   */
182  @Deprecated
183  public ReplicationPeerConfig setReplicateAllUserTables(boolean replicateAllUserTables) {
184    this.replicateAllUserTables = replicateAllUserTables;
185    return this;
186  }
187
188  public Map<TableName, List<String>> getExcludeTableCFsMap() {
189    return (Map<TableName, List<String>>) excludeTableCFsMap;
190  }
191
192  /**
193   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
194   *             {@link ReplicationPeerConfigBuilder#setExcludeTableCFsMap(Map)} instead.
195   */
196  @Deprecated
197  public ReplicationPeerConfig
198    setExcludeTableCFsMap(Map<TableName, ? extends Collection<String>> tableCFsMap) {
199    this.excludeTableCFsMap = tableCFsMap;
200    return this;
201  }
202
203  public Set<String> getExcludeNamespaces() {
204    return this.excludeNamespaces;
205  }
206
207  /**
208   * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
209   *             {@link ReplicationPeerConfigBuilder#setExcludeNamespaces(Set)} instead.
210   */
211  @Deprecated
212  public ReplicationPeerConfig setExcludeNamespaces(Set<String> namespaces) {
213    this.excludeNamespaces = namespaces;
214    return this;
215  }
216
217  public static ReplicationPeerConfigBuilder newBuilder() {
218    return new ReplicationPeerConfigBuilderImpl();
219  }
220
221  public boolean isSerial() {
222    return serial;
223  }
224
225  public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
226    ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
227    builder.setClusterKey(peerConfig.getClusterKey())
228      .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
229      .putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
230      .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
231      .setReplicateAllUserTables(peerConfig.replicateAllUserTables())
232      .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
233      .setExcludeNamespaces(peerConfig.getExcludeNamespaces())
234      .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial());
235    return builder;
236  }
237
238  static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBuilder {
239
240    private String clusterKey;
241
242    private String replicationEndpointImpl;
243
244    private Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
245
246    private Map<String, String> configuration = new HashMap<>();
247
248    private Map<TableName, List<String>> tableCFsMap = null;
249
250    private Set<String> namespaces = null;
251
252    // Default value is true, means replicate all user tables to peer cluster.
253    private boolean replicateAllUserTables = true;
254
255    private Map<TableName, List<String>> excludeTableCFsMap = null;
256
257    private Set<String> excludeNamespaces = null;
258
259    private long bandwidth = 0;
260
261    private boolean serial = false;
262
263    @Override
264    public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
265      this.clusterKey = clusterKey != null ? clusterKey.trim() : null;
266      return this;
267    }
268
269    @Override
270    public ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl) {
271      this.replicationEndpointImpl = replicationEndpointImpl;
272      return this;
273    }
274
275    @Override
276    public ReplicationPeerConfigBuilder putConfiguration(String key, String value) {
277      this.configuration.put(key, value);
278      return this;
279    }
280
281    @Override
282    public ReplicationPeerConfigBuilder removeConfiguration(String key) {
283      this.configuration.remove(key);
284      return this;
285    }
286
287    @Override
288    public ReplicationPeerConfigBuilder putPeerData(byte[] key, byte[] value) {
289      this.peerData.put(key, value);
290      return this;
291    }
292
293    @Override
294    public ReplicationPeerConfigBuilder setTableCFsMap(Map<TableName, List<String>> tableCFsMap) {
295      this.tableCFsMap = tableCFsMap;
296      return this;
297    }
298
299    @Override
300    public ReplicationPeerConfigBuilder setNamespaces(Set<String> namespaces) {
301      this.namespaces = namespaces;
302      return this;
303    }
304
305    @Override
306    public ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables) {
307      this.replicateAllUserTables = replicateAllUserTables;
308      return this;
309    }
310
311    @Override
312    public ReplicationPeerConfigBuilder
313      setExcludeTableCFsMap(Map<TableName, List<String>> excludeTableCFsMap) {
314      this.excludeTableCFsMap = excludeTableCFsMap;
315      return this;
316    }
317
318    @Override
319    public ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> excludeNamespaces) {
320      this.excludeNamespaces = excludeNamespaces;
321      return this;
322    }
323
324    @Override
325    public ReplicationPeerConfigBuilder setBandwidth(long bandwidth) {
326      this.bandwidth = bandwidth;
327      return this;
328    }
329
330    @Override
331    public ReplicationPeerConfigBuilder setSerial(boolean serial) {
332      this.serial = serial;
333      return this;
334    }
335
336    @Override
337    public ReplicationPeerConfig build() {
338      // It would be nice to validate the configuration, but we have to work with "old" data
339      // from ZK which makes it much more difficult.
340      return new ReplicationPeerConfig(this);
341    }
342  }
343
344  @Override
345  public String toString() {
346    StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
347    builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
348    builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
349    if (replicateAllUserTables) {
350      if (excludeNamespaces != null) {
351        builder.append("excludeNamespaces=").append(excludeNamespaces.toString()).append(",");
352      }
353      if (excludeTableCFsMap != null) {
354        builder.append("excludeTableCFsMap=").append(excludeTableCFsMap.toString()).append(",");
355      }
356    } else {
357      if (namespaces != null) {
358        builder.append("namespaces=").append(namespaces.toString()).append(",");
359      }
360      if (tableCFsMap != null) {
361        builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
362      }
363    }
364    builder.append("bandwidth=").append(bandwidth).append(",");
365    builder.append("serial=").append(serial);
366    return builder.toString();
367  }
368
369  /**
370   * Decide whether the table need replicate to the peer cluster
371   * @param table name of the table
372   * @return true if the table need replicate to the peer cluster
373   */
374  public boolean needToReplicate(TableName table) {
375    return needToReplicate(table, null);
376  }
377
378  /**
379   * Decide whether the passed family of the table need replicate to the peer cluster according to
380   * this peer config.
381   * @param table  name of the table
382   * @param family family name
383   * @return true if (the family of) the table need replicate to the peer cluster. If passed family
384   *         is null, return true if any CFs of the table need replicate; If passed family is not
385   *         null, return true if the passed family need replicate.
386   */
387  public boolean needToReplicate(TableName table, byte[] family) {
388    String namespace = table.getNamespaceAsString();
389    if (replicateAllUserTables) {
390      // replicate all user tables, but filter by exclude namespaces and table-cfs config
391      if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
392        return false;
393      }
394      // trap here, must check existence first since HashMap allows null value.
395      if (excludeTableCFsMap == null || !excludeTableCFsMap.containsKey(table)) {
396        return true;
397      }
398      Collection<String> cfs = excludeTableCFsMap.get(table);
399      // If cfs is null or empty then we can make sure that we do not need to replicate this table,
400      // otherwise, we may still need to replicate the table but filter out some families.
401      return cfs != null && !cfs.isEmpty()
402      // If exclude-table-cfs contains passed family then we make sure that we do not need to
403      // replicate this family.
404        && (family == null || !cfs.contains(Bytes.toString(family)));
405    } else {
406      // Not replicate all user tables, so filter by namespaces and table-cfs config
407      if (namespaces == null && tableCFsMap == null) {
408        return false;
409      }
410      // First filter by namespaces config
411      // If table's namespace in peer config, all the tables data are applicable for replication
412      if (namespaces != null && namespaces.contains(namespace)) {
413        return true;
414      }
415      // If table-cfs contains this table then we can make sure that we need replicate some CFs of
416      // this table. Further we need all CFs if tableCFsMap.get(table) is null or empty.
417      return tableCFsMap != null && tableCFsMap.containsKey(table)
418        && (family == null || CollectionUtils.isEmpty(tableCFsMap.get(table))
419        // If table-cfs must contain passed family then we need to replicate this family.
420          || tableCFsMap.get(table).contains(Bytes.toString(family)));
421    }
422  }
423}