001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030import org.apache.commons.lang3.StringUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.CompoundConfiguration;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.exceptions.DeserializationException;
036import org.apache.hadoop.hbase.replication.ReplicationException;
037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
039import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.yetus.audience.InterfaceAudience;
042import org.apache.yetus.audience.InterfaceStability;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
047import org.apache.hbase.thirdparty.com.google.common.base.Strings;
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
051
052import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
055
056/**
057 * Helper for TableCFs Operations.
058 */
059@InterfaceAudience.Private
060@InterfaceStability.Stable
061public final class ReplicationPeerConfigUtil {
062
063  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
064  public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
065    "hbase.replication.peer.base.config";
066
067  private ReplicationPeerConfigUtil() {
068  }
069
070  public static String convertToString(Set<String> namespaces) {
071    if (namespaces == null) {
072      return null;
073    }
074    return StringUtils.join(namespaces, ';');
075  }
076
077  /** convert map to TableCFs Object */
078  public static ReplicationProtos.TableCF[]
079    convert(Map<TableName, ? extends Collection<String>> tableCfs) {
080    if (tableCfs == null) {
081      return null;
082    }
083    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
084    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
085    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
086      tableCFBuilder.clear();
087      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
088      Collection<String> v = entry.getValue();
089      if (v != null && !v.isEmpty()) {
090        for (String value : entry.getValue()) {
091          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
092        }
093      }
094      tableCFList.add(tableCFBuilder.build());
095    }
096    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
097  }
098
099  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
100    if (tableCfs == null) {
101      return null;
102    }
103    return convert(convert(tableCfs));
104  }
105
106  /**
107   * Convert string to TableCFs Object. This is only for read TableCFs information from TableCF
108   * node. Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
109   */
110  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
111    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
112      return null;
113    }
114
115    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
116    List<String> tables = Splitter.on(';').splitToList(tableCFsConfig);
117    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.size());
118
119    for (String tab : tables) {
120      // 1 ignore empty table config
121      tab = tab.trim();
122      if (tab.length() == 0) {
123        continue;
124      }
125      // 2 split to "table" and "cf1,cf2"
126      // for each table: "table#cf1,cf2" or "table"
127      List<String> pair = Splitter.on(':').splitToList(tab);
128      if (pair.size() > 2) {
129        LOG.info("incorrect format:" + tableCFsConfig);
130        continue;
131      }
132      assert pair.size() > 0;
133      Iterator<String> i = pair.iterator();
134      String tabName = i.next().trim();
135      if (tabName.length() == 0) {
136        LOG.info("incorrect format:" + tableCFsConfig);
137        continue;
138      }
139
140      tableCFBuilder.clear();
141      // split namespace from tableName
142      String ns = "default";
143      String tName = tabName;
144      List<String> dbs = Splitter.on('.').splitToList(tabName);
145      if (dbs != null && dbs.size() == 2) {
146        Iterator<String> ii = dbs.iterator();
147        ns = ii.next();
148        tName = ii.next();
149      }
150      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
151
152      // 3 parse "cf1,cf2" part to List<cf>
153      if (i.hasNext()) {
154        List<String> cfsList = Splitter.on(',').splitToList(i.next());
155        for (String cf : cfsList) {
156          String cfName = cf.trim();
157          if (cfName.length() > 0) {
158            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
159          }
160        }
161      }
162      tableCFList.add(tableCFBuilder.build());
163    }
164    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
165  }
166
167  /**
168   * Convert TableCFs Object to String. Output String Format:
169   * ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
170   */
171  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
172    StringBuilder sb = new StringBuilder();
173    for (int i = 0, n = tableCFs.length; i < n; i++) {
174      ReplicationProtos.TableCF tableCF = tableCFs[i];
175      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
176      if (StringUtils.isNotEmpty(namespace)) {
177        sb.append(namespace).append(".")
178          .append(tableCF.getTableName().getQualifier().toStringUtf8()).append(":");
179      } else {
180        sb.append(tableCF.getTableName().toString()).append(":");
181      }
182      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
183        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
184      }
185      sb.deleteCharAt(sb.length() - 1).append(";");
186    }
187    if (sb.length() > 0) {
188      sb.deleteCharAt(sb.length() - 1);
189    }
190    return sb.toString();
191  }
192
193  /**
194   * Get TableCF in TableCFs, if not exist, return null.
195   */
196  public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
197    String table) {
198    for (int i = 0, n = tableCFs.length; i < n; i++) {
199      ReplicationProtos.TableCF tableCF = tableCFs[i];
200      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
201        return tableCF;
202      }
203    }
204    return null;
205  }
206
207  /**
208   * Parse bytes into TableCFs. It is used for backward compatibility. Old format bytes have no
209   * PB_MAGIC Header
210   */
211  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
212    if (bytes == null) {
213      return null;
214    }
215    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
216  }
217
218  /**
219   * Convert tableCFs string into Map.
220   */
221  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
222    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
223    return convert2Map(tableCFs);
224  }
225
226  /**
227   * Convert tableCFs Object to Map.
228   */
229  public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
230    if (tableCFs == null || tableCFs.length == 0) {
231      return null;
232    }
233    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
234    for (int i = 0, n = tableCFs.length; i < n; i++) {
235      ReplicationProtos.TableCF tableCF = tableCFs[i];
236      List<String> families = new ArrayList<>();
237      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
238        families.add(tableCF.getFamilies(j).toStringUtf8());
239      }
240      if (families.size() > 0) {
241        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
242      } else {
243        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
244      }
245    }
246
247    return tableCFsMap;
248  }
249
250  /**
251   * Parse the serialized representation of a peer configuration.
252   * @param bytes Content of a peer znode.
253   * @return ClusterKey parsed from the passed bytes.
254   * @throws DeserializationException deserialization exception
255   */
256  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
257    throws DeserializationException {
258    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
259      int pbLen = ProtobufUtil.lengthOfPBMagic();
260      ReplicationProtos.ReplicationPeer.Builder builder =
261        ReplicationProtos.ReplicationPeer.newBuilder();
262      ReplicationProtos.ReplicationPeer peer;
263      try {
264        ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
265        peer = builder.build();
266      } catch (IOException e) {
267        throw new DeserializationException(e);
268      }
269      return convert(peer);
270    } else {
271      if (bytes == null || bytes.length <= 0) {
272        throw new DeserializationException("Bytes to deserialize should not be empty.");
273      }
274      return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
275    }
276  }
277
278  public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
279    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
280    if (peer.hasClusterkey()) {
281      builder.setClusterKey(peer.getClusterkey());
282    }
283    if (peer.hasReplicationEndpointImpl()) {
284      builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
285    }
286
287    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
288      builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
289    }
290
291    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
292      builder.putConfiguration(pair.getName(), pair.getValue());
293    }
294
295    Map<TableName, List<String>> tableCFsMap = convert2Map(
296      peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
297    if (tableCFsMap != null) {
298      builder.setTableCFsMap(tableCFsMap);
299    }
300
301    List<ByteString> namespacesList = peer.getNamespacesList();
302    if (namespacesList != null && namespacesList.size() != 0) {
303      builder.setNamespaces(
304        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
305    }
306
307    if (peer.hasBandwidth()) {
308      builder.setBandwidth(peer.getBandwidth());
309    }
310
311    if (peer.hasReplicateAll()) {
312      builder.setReplicateAllUserTables(peer.getReplicateAll());
313    }
314
315    if (peer.hasSerial()) {
316      builder.setSerial(peer.getSerial());
317    }
318
319    Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
320      .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
321    if (excludeTableCFsMap != null) {
322      builder.setExcludeTableCFsMap(excludeTableCFsMap);
323    }
324
325    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
326    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
327      builder.setExcludeNamespaces(
328        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
329    }
330
331    return builder.build();
332  }
333
334  public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
335    ReplicationProtos.ReplicationPeer.Builder builder =
336      ReplicationProtos.ReplicationPeer.newBuilder();
337    // we used to set cluster key as required so here we must always set it, until we can make sure
338    // that no one uses the old proto file.
339    builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
340    if (peerConfig.getReplicationEndpointImpl() != null) {
341      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
342    }
343
344    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
345      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
346        .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
347        .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())).build());
348    }
349
350    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
351      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry.getKey())
352        .setValue(entry.getValue()).build());
353    }
354
355    ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
356    if (tableCFs != null) {
357      for (int i = 0; i < tableCFs.length; i++) {
358        builder.addTableCfs(tableCFs[i]);
359      }
360    }
361    Set<String> namespaces = peerConfig.getNamespaces();
362    if (namespaces != null) {
363      for (String namespace : namespaces) {
364        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
365      }
366    }
367
368    builder.setBandwidth(peerConfig.getBandwidth());
369    builder.setReplicateAll(peerConfig.replicateAllUserTables());
370    builder.setSerial(peerConfig.isSerial());
371
372    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
373    if (excludeTableCFs != null) {
374      for (int i = 0; i < excludeTableCFs.length; i++) {
375        builder.addExcludeTableCfs(excludeTableCFs[i]);
376      }
377    }
378    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
379    if (excludeNamespaces != null) {
380      for (String namespace : excludeNamespaces) {
381        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
382      }
383    }
384
385    return builder.build();
386  }
387
388  /**
389   * Returns Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
390   * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
391   * /hbase/replication/peers/PEER_ID
392   */
393  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
394    byte[] bytes = convert(peerConfig).toByteArray();
395    return ProtobufUtil.prependPBMagic(bytes);
396  }
397
398  public static ReplicationPeerDescription
399    toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) {
400    boolean enabled =
401      ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState();
402    ReplicationPeerConfig config = convert(desc.getConfig());
403    return new ReplicationPeerDescription(desc.getId(), enabled, config);
404  }
405
406  public static ReplicationProtos.ReplicationPeerDescription
407    toProtoReplicationPeerDescription(ReplicationPeerDescription desc) {
408    ReplicationProtos.ReplicationPeerDescription.Builder builder =
409      ReplicationProtos.ReplicationPeerDescription.newBuilder();
410    builder.setId(desc.getPeerId());
411    ReplicationProtos.ReplicationState.Builder stateBuilder =
412      ReplicationProtos.ReplicationState.newBuilder();
413    stateBuilder.setState(desc.isEnabled()
414      ? ReplicationProtos.ReplicationState.State.ENABLED
415      : ReplicationProtos.ReplicationState.State.DISABLED);
416    builder.setState(stateBuilder.build());
417    builder.setConfig(convert(desc.getPeerConfig()));
418    return builder.build();
419  }
420
421  public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
422    Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
423    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
424    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
425    if (preTableCfs == null) {
426      builder.setTableCFsMap(tableCfs);
427    } else {
428      builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
429    }
430    return builder.build();
431  }
432
433  /**
434   * Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This
435   * merges the user supplied peer configuration
436   * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as
437   * property hbase.replication.peer.base.configs in hbase configuration. Expected format for this
438   * hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing
439   * key-value from peer config.
440   * @param conf Configuration
441   * @return ReplicationPeerConfig containing updated configs.
442   */
443  public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf,
444    ReplicationPeerConfig receivedPeerConfig) {
445    ReplicationPeerConfigBuilder copiedPeerConfigBuilder =
446      ReplicationPeerConfig.newBuilder(receivedPeerConfig);
447
448    Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
449    String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
450    if (basePeerConfigs.length() != 0) {
451      Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
452        .withKeyValueSeparator("=").split(basePeerConfigs);
453      for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) {
454        String configName = entry.getKey();
455        String configValue = entry.getValue();
456        // If the config is provided with empty value, for eg. k1="",
457        // we remove it from peer config. Providing config with empty value
458        // is required so that it doesn't remove any other config unknowingly.
459        if (Strings.isNullOrEmpty(configValue)) {
460          copiedPeerConfigBuilder.removeConfiguration(configName);
461        } else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) {
462          // update the configuration if exact config and value doesn't exists
463          copiedPeerConfigBuilder.putConfiguration(configName, configValue);
464        }
465      }
466    }
467
468    return copiedPeerConfigBuilder.build();
469  }
470
471  public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
472    Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
473    throws ReplicationException {
474    if (excludeTableCfs == null) {
475      throw new ReplicationException("exclude tableCfs is null");
476    }
477    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
478    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
479    if (preExcludeTableCfs == null) {
480      builder.setExcludeTableCFsMap(excludeTableCfs);
481    } else {
482      builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
483    }
484    return builder.build();
485  }
486
487  private static Map<TableName, List<String>>
488    mergeTableCFs(Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
489    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
490    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
491      TableName table = entry.getKey();
492      Collection<String> appendCfs = entry.getValue();
493      if (newTableCfs.containsKey(table)) {
494        List<String> cfs = newTableCfs.get(table);
495        if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
496          newTableCfs.put(table, null);
497        } else {
498          Set<String> cfSet = new HashSet<String>(cfs);
499          cfSet.addAll(appendCfs);
500          newTableCfs.put(table, Lists.newArrayList(cfSet));
501        }
502      } else {
503        if (appendCfs == null || appendCfs.isEmpty()) {
504          newTableCfs.put(table, null);
505        } else {
506          newTableCfs.put(table, Lists.newArrayList(appendCfs));
507        }
508      }
509    }
510    return newTableCfs;
511  }
512
513  private static Map<TableName, List<String>>
514    copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
515    Map<TableName, List<String>> newTableCfs = new HashMap<>();
516    preTableCfs.forEach(
517      (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
518    return newTableCfs;
519  }
520
521  public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
522    Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, String id)
523    throws ReplicationException {
524    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
525    if (preTableCfs == null) {
526      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
527    }
528    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
529    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
530      TableName table = entry.getKey();
531      Collection<String> removeCfs = entry.getValue();
532      if (newTableCfs.containsKey(table)) {
533        List<String> cfs = newTableCfs.get(table);
534        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
535          newTableCfs.remove(table);
536        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
537          Set<String> cfSet = new HashSet<String>(cfs);
538          cfSet.removeAll(removeCfs);
539          if (cfSet.isEmpty()) {
540            newTableCfs.remove(table);
541          } else {
542            newTableCfs.put(table, Lists.newArrayList(cfSet));
543          }
544        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
545          throw new ReplicationException("Cannot remove cf of table: " + table
546            + " which doesn't specify cfs from table-cfs config in peer: " + id);
547        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
548          throw new ReplicationException("Cannot remove table: " + table
549            + " which has specified cfs from table-cfs config in peer: " + id);
550        }
551      } else {
552        throw new ReplicationException(
553          "No table: " + table + " in table-cfs config of peer: " + id);
554      }
555    }
556    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
557    builder.setTableCFsMap(newTableCfs);
558    return builder.build();
559  }
560
561  public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
562    Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
563    throws ReplicationException {
564    if (excludeTableCfs == null) {
565      throw new ReplicationException("exclude tableCfs is null");
566    }
567    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
568    if (preExcludeTableCfs == null) {
569      throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
570    }
571    Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
572    for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
573      TableName table = entry.getKey();
574      Collection<String> removeCfs = entry.getValue();
575      if (newExcludeTableCfs.containsKey(table)) {
576        List<String> cfs = newExcludeTableCfs.get(table);
577        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
578          newExcludeTableCfs.remove(table);
579        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
580          Set<String> cfSet = new HashSet<String>(cfs);
581          cfSet.removeAll(removeCfs);
582          if (cfSet.isEmpty()) {
583            newExcludeTableCfs.remove(table);
584          } else {
585            newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
586          }
587        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
588          throw new ReplicationException("Cannot remove cf of table: " + table
589            + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
590        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
591          throw new ReplicationException("Cannot remove table: " + table
592            + " which has specified cfs from exclude-table-cfs config in peer: " + id);
593        }
594      } else {
595        throw new ReplicationException(
596          "No table: " + table + " in exclude-table-cfs config of peer: " + id);
597      }
598    }
599    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
600    builder.setExcludeTableCFsMap(newExcludeTableCfs);
601    return builder.build();
602  }
603
604  /**
605   * Returns the configuration needed to talk to the remote slave cluster.
606   * @param conf the base configuration
607   * @param peer the description of replication peer
608   * @return the configuration for the peer cluster, null if it was unable to get the configuration
609   * @throws IOException when create peer cluster configuration failed
610   */
611  public static Configuration getPeerClusterConfiguration(Configuration conf,
612    ReplicationPeerDescription peer) throws IOException {
613    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
614    Configuration otherConf;
615    try {
616      otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
617    } catch (IOException e) {
618      throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
619    }
620
621    if (!peerConfig.getConfiguration().isEmpty()) {
622      CompoundConfiguration compound = new CompoundConfiguration();
623      compound.add(otherConf);
624      compound.addStringMap(peerConfig.getConfiguration());
625      return compound;
626    }
627    return otherConf;
628  }
629}