001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Set;
028import java.util.stream.Collectors;
029import org.apache.commons.lang3.StringUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.CompoundConfiguration;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.exceptions.DeserializationException;
035import org.apache.hadoop.hbase.replication.ReplicationException;
036import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
037import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
038import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
039import org.apache.hadoop.hbase.replication.SyncReplicationState;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
047import org.apache.hbase.thirdparty.com.google.common.base.Strings;
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
055
056/**
057 * Helper for TableCFs Operations.
058 */
059@InterfaceAudience.Private
060@InterfaceStability.Stable
061public final class ReplicationPeerConfigUtil {
062
063  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
064  public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
065    "hbase.replication.peer.base.config";
066
067  private ReplicationPeerConfigUtil() {
068  }
069
070  public static String convertToString(Set<String> namespaces) {
071    if (namespaces == null) {
072      return null;
073    }
074    return StringUtils.join(namespaces, ';');
075  }
076
077  /** convert map to TableCFs Object */
078  public static ReplicationProtos.TableCF[]
079    convert(Map<TableName, ? extends Collection<String>> tableCfs) {
080    if (tableCfs == null) {
081      return null;
082    }
083    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
084    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
085    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
086      tableCFBuilder.clear();
087      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
088      Collection<String> v = entry.getValue();
089      if (v != null && !v.isEmpty()) {
090        for (String value : entry.getValue()) {
091          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
092        }
093      }
094      tableCFList.add(tableCFBuilder.build());
095    }
096    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
097  }
098
099  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
100    if (tableCfs == null) {
101      return null;
102    }
103    return convert(convert(tableCfs));
104  }
105
106  /**
107   * Convert string to TableCFs Object. This is only for read TableCFs information from TableCF
108   * node. Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
109   */
110  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
111    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
112      return null;
113    }
114
115    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
116    String[] tables = tableCFsConfig.split(";");
117    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length);
118
119    for (String tab : tables) {
120      // 1 ignore empty table config
121      tab = tab.trim();
122      if (tab.length() == 0) {
123        continue;
124      }
125      // 2 split to "table" and "cf1,cf2"
126      // for each table: "table#cf1,cf2" or "table"
127      String[] pair = tab.split(":");
128      String tabName = pair[0].trim();
129      if (pair.length > 2 || tabName.length() == 0) {
130        LOG.info("incorrect format:" + tableCFsConfig);
131        continue;
132      }
133
134      tableCFBuilder.clear();
135      // split namespace from tableName
136      String ns = "default";
137      String tName = tabName;
138      String[] dbs = tabName.split("\\.");
139      if (dbs != null && dbs.length == 2) {
140        ns = dbs[0];
141        tName = dbs[1];
142      }
143      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
144
145      // 3 parse "cf1,cf2" part to List<cf>
146      if (pair.length == 2) {
147        String[] cfsList = pair[1].split(",");
148        for (String cf : cfsList) {
149          String cfName = cf.trim();
150          if (cfName.length() > 0) {
151            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
152          }
153        }
154      }
155      tableCFList.add(tableCFBuilder.build());
156    }
157    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
158  }
159
160  /**
161   * Convert TableCFs Object to String. Output String Format:
162   * ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
163   */
164  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
165    StringBuilder sb = new StringBuilder();
166    for (int i = 0, n = tableCFs.length; i < n; i++) {
167      ReplicationProtos.TableCF tableCF = tableCFs[i];
168      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
169      if (StringUtils.isNotEmpty(namespace)) {
170        sb.append(namespace).append(".")
171          .append(tableCF.getTableName().getQualifier().toStringUtf8()).append(":");
172      } else {
173        sb.append(tableCF.getTableName().toString()).append(":");
174      }
175      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
176        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
177      }
178      sb.deleteCharAt(sb.length() - 1).append(";");
179    }
180    if (sb.length() > 0) {
181      sb.deleteCharAt(sb.length() - 1);
182    }
183    return sb.toString();
184  }
185
186  /**
187   * Get TableCF in TableCFs, if not exist, return null.
188   */
189  public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
190    String table) {
191    for (int i = 0, n = tableCFs.length; i < n; i++) {
192      ReplicationProtos.TableCF tableCF = tableCFs[i];
193      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
194        return tableCF;
195      }
196    }
197    return null;
198  }
199
200  /**
201   * Parse bytes into TableCFs. It is used for backward compatibility. Old format bytes have no
202   * PB_MAGIC Header
203   */
204  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
205    if (bytes == null) {
206      return null;
207    }
208    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
209  }
210
211  /**
212   * Convert tableCFs string into Map.
213   */
214  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
215    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
216    return convert2Map(tableCFs);
217  }
218
219  /**
220   * Convert tableCFs Object to Map.
221   */
222  public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
223    if (tableCFs == null || tableCFs.length == 0) {
224      return null;
225    }
226    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
227    for (int i = 0, n = tableCFs.length; i < n; i++) {
228      ReplicationProtos.TableCF tableCF = tableCFs[i];
229      List<String> families = new ArrayList<>();
230      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
231        families.add(tableCF.getFamilies(j).toStringUtf8());
232      }
233      if (families.size() > 0) {
234        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
235      } else {
236        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
237      }
238    }
239
240    return tableCFsMap;
241  }
242
243  /**
244   * @param bytes Content of a peer znode.
245   * @return ClusterKey parsed from the passed bytes.
246   * @throws DeserializationException deserialization exception
247   */
248  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
249    throws DeserializationException {
250    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
251      int pbLen = ProtobufUtil.lengthOfPBMagic();
252      ReplicationProtos.ReplicationPeer.Builder builder =
253        ReplicationProtos.ReplicationPeer.newBuilder();
254      ReplicationProtos.ReplicationPeer peer;
255      try {
256        ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
257        peer = builder.build();
258      } catch (IOException e) {
259        throw new DeserializationException(e);
260      }
261      return convert(peer);
262    } else {
263      if (bytes == null || bytes.length <= 0) {
264        throw new DeserializationException("Bytes to deserialize should not be empty.");
265      }
266      return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
267    }
268  }
269
270  public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
271    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
272    if (peer.hasClusterkey()) {
273      builder.setClusterKey(peer.getClusterkey());
274    }
275    if (peer.hasReplicationEndpointImpl()) {
276      builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
277    }
278
279    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
280      builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
281    }
282
283    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
284      builder.putConfiguration(pair.getName(), pair.getValue());
285    }
286
287    Map<TableName, List<String>> tableCFsMap = convert2Map(
288      peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
289    if (tableCFsMap != null) {
290      builder.setTableCFsMap(tableCFsMap);
291    }
292
293    List<ByteString> namespacesList = peer.getNamespacesList();
294    if (namespacesList != null && namespacesList.size() != 0) {
295      builder.setNamespaces(
296        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
297    }
298
299    if (peer.hasBandwidth()) {
300      builder.setBandwidth(peer.getBandwidth());
301    }
302
303    if (peer.hasReplicateAll()) {
304      builder.setReplicateAllUserTables(peer.getReplicateAll());
305    }
306
307    if (peer.hasSerial()) {
308      builder.setSerial(peer.getSerial());
309    }
310
311    Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
312      .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
313    if (excludeTableCFsMap != null) {
314      builder.setExcludeTableCFsMap(excludeTableCFsMap);
315    }
316
317    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
318    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
319      builder.setExcludeNamespaces(
320        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
321    }
322
323    if (peer.hasRemoteWALDir()) {
324      builder.setRemoteWALDir(peer.getRemoteWALDir());
325    }
326    return builder.build();
327  }
328
329  public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
330    ReplicationProtos.ReplicationPeer.Builder builder =
331      ReplicationProtos.ReplicationPeer.newBuilder();
332    // we used to set cluster key as required so here we must always set it, until we can make sure
333    // that no one uses the old proto file.
334    builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
335    if (peerConfig.getReplicationEndpointImpl() != null) {
336      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
337    }
338
339    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
340      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
341        .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
342        .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())).build());
343    }
344
345    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
346      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry.getKey())
347        .setValue(entry.getValue()).build());
348    }
349
350    ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
351    if (tableCFs != null) {
352      for (int i = 0; i < tableCFs.length; i++) {
353        builder.addTableCfs(tableCFs[i]);
354      }
355    }
356    Set<String> namespaces = peerConfig.getNamespaces();
357    if (namespaces != null) {
358      for (String namespace : namespaces) {
359        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
360      }
361    }
362
363    builder.setBandwidth(peerConfig.getBandwidth());
364    builder.setReplicateAll(peerConfig.replicateAllUserTables());
365    builder.setSerial(peerConfig.isSerial());
366
367    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
368    if (excludeTableCFs != null) {
369      for (int i = 0; i < excludeTableCFs.length; i++) {
370        builder.addExcludeTableCfs(excludeTableCFs[i]);
371      }
372    }
373    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
374    if (excludeNamespaces != null) {
375      for (String namespace : excludeNamespaces) {
376        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
377      }
378    }
379
380    if (peerConfig.getRemoteWALDir() != null) {
381      builder.setRemoteWALDir(peerConfig.getRemoteWALDir());
382    }
383    return builder.build();
384  }
385
386  /**
387   * @param peerConfig peer config of replication peer
388   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
389   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
390   *         /hbase/replication/peers/PEER_ID
391   */
392  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
393    byte[] bytes = convert(peerConfig).toByteArray();
394    return ProtobufUtil.prependPBMagic(bytes);
395  }
396
397  public static ReplicationPeerDescription
398    toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) {
399    boolean enabled =
400      ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState();
401    ReplicationPeerConfig config = convert(desc.getConfig());
402    return new ReplicationPeerDescription(desc.getId(), enabled, config,
403      toSyncReplicationState(desc.getSyncReplicationState()));
404  }
405
406  public static ReplicationProtos.ReplicationPeerDescription
407    toProtoReplicationPeerDescription(ReplicationPeerDescription desc) {
408    ReplicationProtos.ReplicationPeerDescription.Builder builder =
409      ReplicationProtos.ReplicationPeerDescription.newBuilder();
410    builder.setId(desc.getPeerId());
411
412    ReplicationProtos.ReplicationState.Builder stateBuilder =
413      ReplicationProtos.ReplicationState.newBuilder();
414    stateBuilder.setState(desc.isEnabled()
415      ? ReplicationProtos.ReplicationState.State.ENABLED
416      : ReplicationProtos.ReplicationState.State.DISABLED);
417    builder.setState(stateBuilder.build());
418
419    builder.setConfig(convert(desc.getPeerConfig()));
420    builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState()));
421
422    return builder.build();
423  }
424
425  public static ReplicationProtos.SyncReplicationState
426    toSyncReplicationState(SyncReplicationState state) {
427    ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder =
428      ReplicationProtos.SyncReplicationState.newBuilder();
429    syncReplicationStateBuilder
430      .setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal()));
431    return syncReplicationStateBuilder.build();
432  }
433
434  public static SyncReplicationState
435    toSyncReplicationState(ReplicationProtos.SyncReplicationState state) {
436    return SyncReplicationState.valueOf(state.getState().getNumber());
437  }
438
439  public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
440    Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
441    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
442    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
443    if (preTableCfs == null) {
444      builder.setTableCFsMap(tableCfs);
445    } else {
446      builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
447    }
448    return builder.build();
449  }
450
451  /**
452   * Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This
453   * merges the user supplied peer configuration
454   * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as
455   * property hbase.replication.peer.base.configs in hbase configuration. Expected format for this
456   * hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing
457   * key-value from peer config.
458   * @param conf Configuration
459   * @return ReplicationPeerConfig containing updated configs.
460   */
461  public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf,
462    ReplicationPeerConfig receivedPeerConfig) {
463    ReplicationPeerConfigBuilder copiedPeerConfigBuilder =
464      ReplicationPeerConfig.newBuilder(receivedPeerConfig);
465
466    Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
467    String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
468    if (basePeerConfigs.length() != 0) {
469      Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
470        .withKeyValueSeparator("=").split(basePeerConfigs);
471      for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) {
472        String configName = entry.getKey();
473        String configValue = entry.getValue();
474        // If the config is provided with empty value, for eg. k1="",
475        // we remove it from peer config. Providing config with empty value
476        // is required so that it doesn't remove any other config unknowingly.
477        if (Strings.isNullOrEmpty(configValue)) {
478          copiedPeerConfigBuilder.removeConfiguration(configName);
479        } else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) {
480          // update the configuration if exact config and value doesn't exists
481          copiedPeerConfigBuilder.putConfiguration(configName, configValue);
482        }
483      }
484    }
485
486    return copiedPeerConfigBuilder.build();
487  }
488
489  public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
490    Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
491    throws ReplicationException {
492    if (excludeTableCfs == null) {
493      throw new ReplicationException("exclude tableCfs is null");
494    }
495    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
496    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
497    if (preExcludeTableCfs == null) {
498      builder.setExcludeTableCFsMap(excludeTableCfs);
499    } else {
500      builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
501    }
502    return builder.build();
503  }
504
505  private static Map<TableName, List<String>>
506    mergeTableCFs(Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
507    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
508    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
509      TableName table = entry.getKey();
510      Collection<String> appendCfs = entry.getValue();
511      if (newTableCfs.containsKey(table)) {
512        List<String> cfs = newTableCfs.get(table);
513        if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
514          newTableCfs.put(table, null);
515        } else {
516          Set<String> cfSet = new HashSet<String>(cfs);
517          cfSet.addAll(appendCfs);
518          newTableCfs.put(table, Lists.newArrayList(cfSet));
519        }
520      } else {
521        if (appendCfs == null || appendCfs.isEmpty()) {
522          newTableCfs.put(table, null);
523        } else {
524          newTableCfs.put(table, Lists.newArrayList(appendCfs));
525        }
526      }
527    }
528    return newTableCfs;
529  }
530
531  private static Map<TableName, List<String>>
532    copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
533    Map<TableName, List<String>> newTableCfs = new HashMap<>();
534    preTableCfs.forEach(
535      (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
536    return newTableCfs;
537  }
538
539  public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
540    Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, String id)
541    throws ReplicationException {
542    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
543    if (preTableCfs == null) {
544      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
545    }
546    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
547    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
548      TableName table = entry.getKey();
549      Collection<String> removeCfs = entry.getValue();
550      if (newTableCfs.containsKey(table)) {
551        List<String> cfs = newTableCfs.get(table);
552        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
553          newTableCfs.remove(table);
554        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
555          Set<String> cfSet = new HashSet<String>(cfs);
556          cfSet.removeAll(removeCfs);
557          if (cfSet.isEmpty()) {
558            newTableCfs.remove(table);
559          } else {
560            newTableCfs.put(table, Lists.newArrayList(cfSet));
561          }
562        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
563          throw new ReplicationException("Cannot remove cf of table: " + table
564            + " which doesn't specify cfs from table-cfs config in peer: " + id);
565        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
566          throw new ReplicationException("Cannot remove table: " + table
567            + " which has specified cfs from table-cfs config in peer: " + id);
568        }
569      } else {
570        throw new ReplicationException(
571          "No table: " + table + " in table-cfs config of peer: " + id);
572      }
573    }
574    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
575    builder.setTableCFsMap(newTableCfs);
576    return builder.build();
577  }
578
579  public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
580    Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
581    throws ReplicationException {
582    if (excludeTableCfs == null) {
583      throw new ReplicationException("exclude tableCfs is null");
584    }
585    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
586    if (preExcludeTableCfs == null) {
587      throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
588    }
589    Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
590    for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
591      TableName table = entry.getKey();
592      Collection<String> removeCfs = entry.getValue();
593      if (newExcludeTableCfs.containsKey(table)) {
594        List<String> cfs = newExcludeTableCfs.get(table);
595        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
596          newExcludeTableCfs.remove(table);
597        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
598          Set<String> cfSet = new HashSet<String>(cfs);
599          cfSet.removeAll(removeCfs);
600          if (cfSet.isEmpty()) {
601            newExcludeTableCfs.remove(table);
602          } else {
603            newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
604          }
605        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
606          throw new ReplicationException("Cannot remove cf of table: " + table
607            + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
608        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
609          throw new ReplicationException("Cannot remove table: " + table
610            + " which has specified cfs from exclude-table-cfs config in peer: " + id);
611        }
612      } else {
613        throw new ReplicationException(
614          "No table: " + table + " in exclude-table-cfs config of peer: " + id);
615      }
616    }
617    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
618    builder.setExcludeTableCFsMap(newExcludeTableCfs);
619    return builder.build();
620  }
621
622  /**
623   * Returns the configuration needed to talk to the remote slave cluster.
624   * @param conf the base configuration
625   * @param peer the description of replication peer
626   * @return the configuration for the peer cluster, null if it was unable to get the configuration
627   * @throws IOException when create peer cluster configuration failed
628   */
629  public static Configuration getPeerClusterConfiguration(Configuration conf,
630    ReplicationPeerDescription peer) throws IOException {
631    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
632    Configuration otherConf;
633    try {
634      otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
635    } catch (IOException e) {
636      throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
637    }
638
639    if (!peerConfig.getConfiguration().isEmpty()) {
640      CompoundConfiguration compound = new CompoundConfiguration();
641      compound.add(otherConf);
642      compound.addStringMap(peerConfig.getConfiguration());
643      return compound;
644    }
645    return otherConf;
646  }
647}