001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client.replication;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CompoundConfiguration;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.replication.ReplicationException;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.hbase.replication.SyncReplicationState;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.apache.yetus.audience.InterfaceStability;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
054
055/**
056 * Helper for TableCFs Operations.
057 */
058@InterfaceAudience.Private
059@InterfaceStability.Stable
060public final class ReplicationPeerConfigUtil {
061
062  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
063
064  private ReplicationPeerConfigUtil() {}
065
066  public static String convertToString(Set<String> namespaces) {
067    if (namespaces == null) {
068      return null;
069    }
070    return StringUtils.join(namespaces, ';');
071  }
072
073  /** convert map to TableCFs Object */
074  public static ReplicationProtos.TableCF[] convert(
075      Map<TableName, ? extends Collection<String>> tableCfs) {
076    if (tableCfs == null) {
077      return null;
078    }
079    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
080    ReplicationProtos.TableCF.Builder tableCFBuilder =  ReplicationProtos.TableCF.newBuilder();
081    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
082      tableCFBuilder.clear();
083      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
084      Collection<String> v = entry.getValue();
085      if (v != null && !v.isEmpty()) {
086        for (String value : entry.getValue()) {
087          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
088        }
089      }
090      tableCFList.add(tableCFBuilder.build());
091    }
092    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
093  }
094
095  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
096    if (tableCfs == null) {
097      return null;
098    }
099    return convert(convert(tableCfs));
100  }
101
102  /**
103   *  Convert string to TableCFs Object.
104   *  This is only for read TableCFs information from TableCF node.
105   *  Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
106   * */
107  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
108    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
109      return null;
110    }
111
112    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
113    String[] tables = tableCFsConfig.split(";");
114    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length);
115
116    for (String tab : tables) {
117      // 1 ignore empty table config
118      tab = tab.trim();
119      if (tab.length() == 0) {
120        continue;
121      }
122      // 2 split to "table" and "cf1,cf2"
123      //   for each table: "table#cf1,cf2" or "table"
124      String[] pair = tab.split(":");
125      String tabName = pair[0].trim();
126      if (pair.length > 2 || tabName.length() == 0) {
127        LOG.info("incorrect format:" + tableCFsConfig);
128        continue;
129      }
130
131      tableCFBuilder.clear();
132      // split namespace from tableName
133      String ns = "default";
134      String tName = tabName;
135      String[] dbs = tabName.split("\\.");
136      if (dbs != null && dbs.length == 2) {
137        ns = dbs[0];
138        tName = dbs[1];
139      }
140      tableCFBuilder.setTableName(
141        ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
142
143      // 3 parse "cf1,cf2" part to List<cf>
144      if (pair.length == 2) {
145        String[] cfsList = pair[1].split(",");
146        for (String cf : cfsList) {
147          String cfName = cf.trim();
148          if (cfName.length() > 0) {
149            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
150          }
151        }
152      }
153      tableCFList.add(tableCFBuilder.build());
154    }
155    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
156  }
157
158  /**
159   *  Convert TableCFs Object to String.
160   *  Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
161   * */
162  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
163    StringBuilder sb = new StringBuilder();
164    for (int i = 0, n = tableCFs.length; i < n; i++) {
165      ReplicationProtos.TableCF tableCF = tableCFs[i];
166      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
167      if (StringUtils.isNotEmpty(namespace)) {
168        sb.append(namespace).append(".").
169            append(tableCF.getTableName().getQualifier().toStringUtf8())
170            .append(":");
171      } else {
172        sb.append(tableCF.getTableName().toString()).append(":");
173      }
174      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
175        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
176      }
177      sb.deleteCharAt(sb.length() - 1).append(";");
178    }
179    if (sb.length() > 0) {
180      sb.deleteCharAt(sb.length() - 1);
181    }
182    return sb.toString();
183  }
184
185  /**
186   *  Get TableCF in TableCFs, if not exist, return null.
187   * */
188  public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
189                                           String table) {
190    for (int i = 0, n = tableCFs.length; i < n; i++) {
191      ReplicationProtos.TableCF tableCF = tableCFs[i];
192      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
193        return tableCF;
194      }
195    }
196    return null;
197  }
198
199  /**
200   *  Parse bytes into TableCFs.
201   *  It is used for backward compatibility.
202   *  Old format bytes have no PB_MAGIC Header
203   * */
204  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
205    if (bytes == null) {
206      return null;
207    }
208    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
209  }
210
211  /**
212   *  Convert tableCFs string into Map.
213   * */
214  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
215    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
216    return convert2Map(tableCFs);
217  }
218
219  /**
220   *  Convert tableCFs Object to Map.
221   * */
222  public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
223    if (tableCFs == null || tableCFs.length == 0) {
224      return null;
225    }
226    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
227    for (int i = 0, n = tableCFs.length; i < n; i++) {
228      ReplicationProtos.TableCF tableCF = tableCFs[i];
229      List<String> families = new ArrayList<>();
230      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
231        families.add(tableCF.getFamilies(j).toStringUtf8());
232      }
233      if (families.size() > 0) {
234        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
235      } else {
236        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
237      }
238    }
239
240    return tableCFsMap;
241  }
242
243  /**
244   * @param bytes Content of a peer znode.
245   * @return ClusterKey parsed from the passed bytes.
246   * @throws DeserializationException
247   */
248  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
249      throws DeserializationException {
250    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
251      int pbLen = ProtobufUtil.lengthOfPBMagic();
252      ReplicationProtos.ReplicationPeer.Builder builder =
253          ReplicationProtos.ReplicationPeer.newBuilder();
254      ReplicationProtos.ReplicationPeer peer;
255      try {
256        ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
257        peer = builder.build();
258      } catch (IOException e) {
259        throw new DeserializationException(e);
260      }
261      return convert(peer);
262    } else {
263      if (bytes == null || bytes.length <= 0) {
264        throw new DeserializationException("Bytes to deserialize should not be empty.");
265      }
266      return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
267    }
268  }
269
270  public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
271    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
272    if (peer.hasClusterkey()) {
273      builder.setClusterKey(peer.getClusterkey());
274    }
275    if (peer.hasReplicationEndpointImpl()) {
276      builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
277    }
278
279    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
280      builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
281    }
282
283    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
284      builder.putConfiguration(pair.getName(), pair.getValue());
285    }
286
287    Map<TableName, List<String>> tableCFsMap = convert2Map(
288      peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
289    if (tableCFsMap != null) {
290      builder.setTableCFsMap(tableCFsMap);
291    }
292
293    List<ByteString> namespacesList = peer.getNamespacesList();
294    if (namespacesList != null && namespacesList.size() != 0) {
295      builder.setNamespaces(
296        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
297    }
298
299    if (peer.hasBandwidth()) {
300      builder.setBandwidth(peer.getBandwidth());
301    }
302
303    if (peer.hasReplicateAll()) {
304      builder.setReplicateAllUserTables(peer.getReplicateAll());
305    }
306
307    if (peer.hasSerial()) {
308      builder.setSerial(peer.getSerial());
309    }
310
311    Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
312        .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
313    if (excludeTableCFsMap != null) {
314      builder.setExcludeTableCFsMap(excludeTableCFsMap);
315    }
316
317    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
318    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
319      builder.setExcludeNamespaces(
320        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
321    }
322
323    if (peer.hasRemoteWALDir()) {
324      builder.setRemoteWALDir(peer.getRemoteWALDir());
325    }
326    return builder.build();
327  }
328
329  public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
330    ReplicationProtos.ReplicationPeer.Builder builder =
331        ReplicationProtos.ReplicationPeer.newBuilder();
332    // we used to set cluster key as required so here we must always set it, until we can make sure
333    // that no one uses the old proto file.
334    builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
335    if (peerConfig.getReplicationEndpointImpl() != null) {
336      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
337    }
338
339    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
340      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
341          .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
342          .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue()))
343          .build());
344    }
345
346    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
347      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
348          .setName(entry.getKey())
349          .setValue(entry.getValue())
350          .build());
351    }
352
353    ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
354    if (tableCFs != null) {
355      for (int i = 0; i < tableCFs.length; i++) {
356        builder.addTableCfs(tableCFs[i]);
357      }
358    }
359    Set<String> namespaces = peerConfig.getNamespaces();
360    if (namespaces != null) {
361      for (String namespace : namespaces) {
362        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
363      }
364    }
365
366    builder.setBandwidth(peerConfig.getBandwidth());
367    builder.setReplicateAll(peerConfig.replicateAllUserTables());
368    builder.setSerial(peerConfig.isSerial());
369
370    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
371    if (excludeTableCFs != null) {
372      for (int i = 0; i < excludeTableCFs.length; i++) {
373        builder.addExcludeTableCfs(excludeTableCFs[i]);
374      }
375    }
376    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
377    if (excludeNamespaces != null) {
378      for (String namespace : excludeNamespaces) {
379        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
380      }
381    }
382
383    if (peerConfig.getRemoteWALDir() != null) {
384      builder.setRemoteWALDir(peerConfig.getRemoteWALDir());
385    }
386    return builder.build();
387  }
388
389  /**
390   * @param peerConfig
391   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
392   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
393   *         /hbase/replication/peers/PEER_ID
394   */
395  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
396    byte[] bytes = convert(peerConfig).toByteArray();
397    return ProtobufUtil.prependPBMagic(bytes);
398  }
399
400  public static ReplicationPeerDescription
401      toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) {
402    boolean enabled =
403        ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState();
404    ReplicationPeerConfig config = convert(desc.getConfig());
405    return new ReplicationPeerDescription(desc.getId(), enabled, config,
406      toSyncReplicationState(desc.getSyncReplicationState()));
407  }
408
409  public static ReplicationProtos.ReplicationPeerDescription
410      toProtoReplicationPeerDescription(ReplicationPeerDescription desc) {
411    ReplicationProtos.ReplicationPeerDescription.Builder builder =
412        ReplicationProtos.ReplicationPeerDescription.newBuilder();
413    builder.setId(desc.getPeerId());
414
415    ReplicationProtos.ReplicationState.Builder stateBuilder =
416        ReplicationProtos.ReplicationState.newBuilder();
417    stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED :
418        ReplicationProtos.ReplicationState.State.DISABLED);
419    builder.setState(stateBuilder.build());
420
421    builder.setConfig(convert(desc.getPeerConfig()));
422    builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState()));
423
424    return builder.build();
425  }
426
427  public static ReplicationProtos.SyncReplicationState
428      toSyncReplicationState(SyncReplicationState state) {
429    ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder =
430        ReplicationProtos.SyncReplicationState.newBuilder();
431    syncReplicationStateBuilder
432        .setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal()));
433    return syncReplicationStateBuilder.build();
434  }
435
436  public static SyncReplicationState
437      toSyncReplicationState(ReplicationProtos.SyncReplicationState state) {
438    return SyncReplicationState.valueOf(state.getState().getNumber());
439  }
440
441  public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
442      Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
443    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
444    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
445    if (preTableCfs == null) {
446      builder.setTableCFsMap(tableCfs);
447    } else {
448      builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
449    }
450    return builder.build();
451  }
452
453  public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
454      Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
455      throws ReplicationException {
456    if (excludeTableCfs == null) {
457      throw new ReplicationException("exclude tableCfs is null");
458    }
459    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
460    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
461    if (preExcludeTableCfs == null) {
462      builder.setExcludeTableCFsMap(excludeTableCfs);
463    } else {
464      builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
465    }
466    return builder.build();
467  }
468
469  private static Map<TableName, List<String>> mergeTableCFs(
470      Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
471    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
472    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
473      TableName table = entry.getKey();
474      Collection<String> appendCfs = entry.getValue();
475      if (newTableCfs.containsKey(table)) {
476        List<String> cfs = newTableCfs.get(table);
477        if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
478          newTableCfs.put(table, null);
479        } else {
480          Set<String> cfSet = new HashSet<String>(cfs);
481          cfSet.addAll(appendCfs);
482          newTableCfs.put(table, Lists.newArrayList(cfSet));
483        }
484      } else {
485        if (appendCfs == null || appendCfs.isEmpty()) {
486          newTableCfs.put(table, null);
487        } else {
488          newTableCfs.put(table, Lists.newArrayList(appendCfs));
489        }
490      }
491    }
492    return newTableCfs;
493  }
494
495  private static Map<TableName, List<String>>
496      copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
497    Map<TableName, List<String>> newTableCfs = new HashMap<>();
498    preTableCfs.forEach(
499      (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
500    return newTableCfs;
501  }
502
503  public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
504      Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig,
505      String id) throws ReplicationException {
506    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
507    if (preTableCfs == null) {
508      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
509    }
510    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
511    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
512      TableName table = entry.getKey();
513      Collection<String> removeCfs = entry.getValue();
514      if (newTableCfs.containsKey(table)) {
515        List<String> cfs = newTableCfs.get(table);
516        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
517          newTableCfs.remove(table);
518        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
519          Set<String> cfSet = new HashSet<String>(cfs);
520          cfSet.removeAll(removeCfs);
521          if (cfSet.isEmpty()) {
522            newTableCfs.remove(table);
523          } else {
524            newTableCfs.put(table, Lists.newArrayList(cfSet));
525          }
526        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
527          throw new ReplicationException("Cannot remove cf of table: " + table
528              + " which doesn't specify cfs from table-cfs config in peer: " + id);
529        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
530          throw new ReplicationException("Cannot remove table: " + table
531              + " which has specified cfs from table-cfs config in peer: " + id);
532        }
533      } else {
534        throw new ReplicationException(
535            "No table: " + table + " in table-cfs config of peer: " + id);
536      }
537    }
538    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
539    builder.setTableCFsMap(newTableCfs);
540    return builder.build();
541  }
542
543  public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
544      Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
545      throws ReplicationException {
546    if (excludeTableCfs == null) {
547      throw new ReplicationException("exclude tableCfs is null");
548    }
549    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
550    if (preExcludeTableCfs == null) {
551      throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
552    }
553    Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
554    for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
555      TableName table = entry.getKey();
556      Collection<String> removeCfs = entry.getValue();
557      if (newExcludeTableCfs.containsKey(table)) {
558        List<String> cfs = newExcludeTableCfs.get(table);
559        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
560          newExcludeTableCfs.remove(table);
561        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
562          Set<String> cfSet = new HashSet<String>(cfs);
563          cfSet.removeAll(removeCfs);
564          if (cfSet.isEmpty()) {
565            newExcludeTableCfs.remove(table);
566          } else {
567            newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
568          }
569        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
570          throw new ReplicationException("Cannot remove cf of table: " + table
571              + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
572        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
573          throw new ReplicationException("Cannot remove table: " + table
574              + " which has specified cfs from exclude-table-cfs config in peer: " + id);
575        }
576      } else {
577        throw new ReplicationException(
578            "No table: " + table + " in exclude-table-cfs config of peer: " + id);
579      }
580    }
581    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
582    builder.setExcludeTableCFsMap(newExcludeTableCfs);
583    return builder.build();
584  }
585
586  /**
587   * Returns the configuration needed to talk to the remote slave cluster.
588   * @param conf the base configuration
589   * @param peer the description of replication peer
590   * @return the configuration for the peer cluster, null if it was unable to get the configuration
591   * @throws IOException when create peer cluster configuration failed
592   */
593  public static Configuration getPeerClusterConfiguration(Configuration conf,
594      ReplicationPeerDescription peer) throws IOException {
595    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
596    Configuration otherConf;
597    try {
598      otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
599    } catch (IOException e) {
600      throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
601    }
602
603    if (!peerConfig.getConfiguration().isEmpty()) {
604      CompoundConfiguration compound = new CompoundConfiguration();
605      compound.add(otherConf);
606      compound.addStringMap(peerConfig.getConfiguration());
607      return compound;
608    }
609
610    return otherConf;
611  }
612}