001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client.replication;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CompoundConfiguration;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.replication.ReplicationException;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
049import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
053
054/**
055 * Helper for TableCFs Operations.
056 */
057@InterfaceAudience.Private
058@InterfaceStability.Stable
059public final class ReplicationPeerConfigUtil {
060
061  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
062
063  private ReplicationPeerConfigUtil() {}
064
065  public static String convertToString(Set<String> namespaces) {
066    if (namespaces == null) {
067      return null;
068    }
069    return StringUtils.join(namespaces, ';');
070  }
071
072  /** convert map to TableCFs Object */
073  public static ReplicationProtos.TableCF[] convert(
074      Map<TableName, ? extends Collection<String>> tableCfs) {
075    if (tableCfs == null) {
076      return null;
077    }
078    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
079    ReplicationProtos.TableCF.Builder tableCFBuilder =  ReplicationProtos.TableCF.newBuilder();
080    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
081      tableCFBuilder.clear();
082      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
083      Collection<String> v = entry.getValue();
084      if (v != null && !v.isEmpty()) {
085        for (String value : entry.getValue()) {
086          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
087        }
088      }
089      tableCFList.add(tableCFBuilder.build());
090    }
091    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
092  }
093
094  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
095    if (tableCfs == null) {
096      return null;
097    }
098    return convert(convert(tableCfs));
099  }
100
101  /**
102   *  Convert string to TableCFs Object.
103   *  This is only for read TableCFs information from TableCF node.
104   *  Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
105   * */
106  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
107    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
108      return null;
109    }
110
111    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
112    String[] tables = tableCFsConfig.split(";");
113    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length);
114
115    for (String tab : tables) {
116      // 1 ignore empty table config
117      tab = tab.trim();
118      if (tab.length() == 0) {
119        continue;
120      }
121      // 2 split to "table" and "cf1,cf2"
122      //   for each table: "table#cf1,cf2" or "table"
123      String[] pair = tab.split(":");
124      String tabName = pair[0].trim();
125      if (pair.length > 2 || tabName.length() == 0) {
126        LOG.info("incorrect format:" + tableCFsConfig);
127        continue;
128      }
129
130      tableCFBuilder.clear();
131      // split namespace from tableName
132      String ns = "default";
133      String tName = tabName;
134      String[] dbs = tabName.split("\\.");
135      if (dbs != null && dbs.length == 2) {
136        ns = dbs[0];
137        tName = dbs[1];
138      }
139      tableCFBuilder.setTableName(
140        ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
141
142      // 3 parse "cf1,cf2" part to List<cf>
143      if (pair.length == 2) {
144        String[] cfsList = pair[1].split(",");
145        for (String cf : cfsList) {
146          String cfName = cf.trim();
147          if (cfName.length() > 0) {
148            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
149          }
150        }
151      }
152      tableCFList.add(tableCFBuilder.build());
153    }
154    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
155  }
156
157  /**
158   *  Convert TableCFs Object to String.
159   *  Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
160   * */
161  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
162    StringBuilder sb = new StringBuilder();
163    for (int i = 0, n = tableCFs.length; i < n; i++) {
164      ReplicationProtos.TableCF tableCF = tableCFs[i];
165      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
166      if (StringUtils.isNotEmpty(namespace)) {
167        sb.append(namespace).append(".").
168            append(tableCF.getTableName().getQualifier().toStringUtf8())
169            .append(":");
170      } else {
171        sb.append(tableCF.getTableName().toString()).append(":");
172      }
173      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
174        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
175      }
176      sb.deleteCharAt(sb.length() - 1).append(";");
177    }
178    if (sb.length() > 0) {
179      sb.deleteCharAt(sb.length() - 1);
180    }
181    return sb.toString();
182  }
183
184  /**
185   *  Get TableCF in TableCFs, if not exist, return null.
186   * */
187  public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
188                                           String table) {
189    for (int i = 0, n = tableCFs.length; i < n; i++) {
190      ReplicationProtos.TableCF tableCF = tableCFs[i];
191      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
192        return tableCF;
193      }
194    }
195    return null;
196  }
197
198  /**
199   *  Parse bytes into TableCFs.
200   *  It is used for backward compatibility.
201   *  Old format bytes have no PB_MAGIC Header
202   * */
203  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
204    if (bytes == null) {
205      return null;
206    }
207    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
208  }
209
210  /**
211   *  Convert tableCFs string into Map.
212   * */
213  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
214    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
215    return convert2Map(tableCFs);
216  }
217
218  /**
219   *  Convert tableCFs Object to Map.
220   * */
221  public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
222    if (tableCFs == null || tableCFs.length == 0) {
223      return null;
224    }
225    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
226    for (int i = 0, n = tableCFs.length; i < n; i++) {
227      ReplicationProtos.TableCF tableCF = tableCFs[i];
228      List<String> families = new ArrayList<>();
229      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
230        families.add(tableCF.getFamilies(j).toStringUtf8());
231      }
232      if (families.size() > 0) {
233        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
234      } else {
235        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
236      }
237    }
238
239    return tableCFsMap;
240  }
241
242  /**
243   * @param bytes Content of a peer znode.
244   * @return ClusterKey parsed from the passed bytes.
245   * @throws DeserializationException
246   */
247  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
248      throws DeserializationException {
249    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
250      int pbLen = ProtobufUtil.lengthOfPBMagic();
251      ReplicationProtos.ReplicationPeer.Builder builder =
252          ReplicationProtos.ReplicationPeer.newBuilder();
253      ReplicationProtos.ReplicationPeer peer;
254      try {
255        ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
256        peer = builder.build();
257      } catch (IOException e) {
258        throw new DeserializationException(e);
259      }
260      return convert(peer);
261    } else {
262      if (bytes == null || bytes.length <= 0) {
263        throw new DeserializationException("Bytes to deserialize should not be empty.");
264      }
265      return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
266    }
267  }
268
269  public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
270    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
271    if (peer.hasClusterkey()) {
272      builder.setClusterKey(peer.getClusterkey());
273    }
274    if (peer.hasReplicationEndpointImpl()) {
275      builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
276    }
277
278    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
279      builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
280    }
281
282    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
283      builder.putConfiguration(pair.getName(), pair.getValue());
284    }
285
286    Map<TableName, List<String>> tableCFsMap = convert2Map(
287      peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
288    if (tableCFsMap != null) {
289      builder.setTableCFsMap(tableCFsMap);
290    }
291
292    List<ByteString> namespacesList = peer.getNamespacesList();
293    if (namespacesList != null && namespacesList.size() != 0) {
294      builder.setNamespaces(
295        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
296    }
297
298    if (peer.hasBandwidth()) {
299      builder.setBandwidth(peer.getBandwidth());
300    }
301
302    if (peer.hasReplicateAll()) {
303      builder.setReplicateAllUserTables(peer.getReplicateAll());
304    }
305
306    if (peer.hasSerial()) {
307      builder.setSerial(peer.getSerial());
308    }
309
310    Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
311        .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
312    if (excludeTableCFsMap != null) {
313      builder.setExcludeTableCFsMap(excludeTableCFsMap);
314    }
315
316    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
317    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
318      builder.setExcludeNamespaces(
319        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
320    }
321
322    return builder.build();
323  }
324
325  public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
326    ReplicationProtos.ReplicationPeer.Builder builder =
327        ReplicationProtos.ReplicationPeer.newBuilder();
328    // we used to set cluster key as required so here we must always set it, until we can make sure
329    // that no one uses the old proto file.
330    builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
331    if (peerConfig.getReplicationEndpointImpl() != null) {
332      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
333    }
334
335    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
336      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
337          .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
338          .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue()))
339          .build());
340    }
341
342    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
343      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
344          .setName(entry.getKey())
345          .setValue(entry.getValue())
346          .build());
347    }
348
349    ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
350    if (tableCFs != null) {
351      for (int i = 0; i < tableCFs.length; i++) {
352        builder.addTableCfs(tableCFs[i]);
353      }
354    }
355    Set<String> namespaces = peerConfig.getNamespaces();
356    if (namespaces != null) {
357      for (String namespace : namespaces) {
358        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
359      }
360    }
361
362    builder.setBandwidth(peerConfig.getBandwidth());
363    builder.setReplicateAll(peerConfig.replicateAllUserTables());
364    builder.setSerial(peerConfig.isSerial());
365
366    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
367    if (excludeTableCFs != null) {
368      for (int i = 0; i < excludeTableCFs.length; i++) {
369        builder.addExcludeTableCfs(excludeTableCFs[i]);
370      }
371    }
372    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
373    if (excludeNamespaces != null) {
374      for (String namespace : excludeNamespaces) {
375        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
376      }
377    }
378
379    return builder.build();
380  }
381
382  /**
383   * @param peerConfig
384   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
385   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
386   *         /hbase/replication/peers/PEER_ID
387   */
388  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
389    byte[] bytes = convert(peerConfig).toByteArray();
390    return ProtobufUtil.prependPBMagic(bytes);
391  }
392
393  public static ReplicationPeerDescription toReplicationPeerDescription(
394      ReplicationProtos.ReplicationPeerDescription desc) {
395    boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState()
396        .getState();
397    ReplicationPeerConfig config = convert(desc.getConfig());
398    return new ReplicationPeerDescription(desc.getId(), enabled, config);
399  }
400
401  public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
402      ReplicationPeerDescription desc) {
403    ReplicationProtos.ReplicationPeerDescription.Builder builder =
404        ReplicationProtos.ReplicationPeerDescription.newBuilder();
405    builder.setId(desc.getPeerId());
406    ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
407        .newBuilder();
408    stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED
409        : ReplicationProtos.ReplicationState.State.DISABLED);
410    builder.setState(stateBuilder.build());
411    builder.setConfig(convert(desc.getPeerConfig()));
412    return builder.build();
413  }
414
415  public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
416      Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
417    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
418    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
419    if (preTableCfs == null) {
420      builder.setTableCFsMap(tableCfs);
421    } else {
422      builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
423    }
424    return builder.build();
425  }
426
427  public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
428      Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
429      throws ReplicationException {
430    if (excludeTableCfs == null) {
431      throw new ReplicationException("exclude tableCfs is null");
432    }
433    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
434    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
435    if (preExcludeTableCfs == null) {
436      builder.setExcludeTableCFsMap(excludeTableCfs);
437    } else {
438      builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
439    }
440    return builder.build();
441  }
442
443  private static Map<TableName, List<String>> mergeTableCFs(
444      Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
445    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
446    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
447      TableName table = entry.getKey();
448      Collection<String> appendCfs = entry.getValue();
449      if (newTableCfs.containsKey(table)) {
450        List<String> cfs = newTableCfs.get(table);
451        if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
452          newTableCfs.put(table, null);
453        } else {
454          Set<String> cfSet = new HashSet<String>(cfs);
455          cfSet.addAll(appendCfs);
456          newTableCfs.put(table, Lists.newArrayList(cfSet));
457        }
458      } else {
459        if (appendCfs == null || appendCfs.isEmpty()) {
460          newTableCfs.put(table, null);
461        } else {
462          newTableCfs.put(table, Lists.newArrayList(appendCfs));
463        }
464      }
465    }
466    return newTableCfs;
467  }
468
469  private static Map<TableName, List<String>>
470      copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
471    Map<TableName, List<String>> newTableCfs = new HashMap<>();
472    preTableCfs.forEach(
473      (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
474    return newTableCfs;
475  }
476
477  public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
478      Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig,
479      String id) throws ReplicationException {
480    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
481    if (preTableCfs == null) {
482      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
483    }
484    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
485    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
486      TableName table = entry.getKey();
487      Collection<String> removeCfs = entry.getValue();
488      if (newTableCfs.containsKey(table)) {
489        List<String> cfs = newTableCfs.get(table);
490        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
491          newTableCfs.remove(table);
492        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
493          Set<String> cfSet = new HashSet<String>(cfs);
494          cfSet.removeAll(removeCfs);
495          if (cfSet.isEmpty()) {
496            newTableCfs.remove(table);
497          } else {
498            newTableCfs.put(table, Lists.newArrayList(cfSet));
499          }
500        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
501          throw new ReplicationException("Cannot remove cf of table: " + table
502              + " which doesn't specify cfs from table-cfs config in peer: " + id);
503        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
504          throw new ReplicationException("Cannot remove table: " + table
505              + " which has specified cfs from table-cfs config in peer: " + id);
506        }
507      } else {
508        throw new ReplicationException(
509            "No table: " + table + " in table-cfs config of peer: " + id);
510      }
511    }
512    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
513    builder.setTableCFsMap(newTableCfs);
514    return builder.build();
515  }
516
517  public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
518      Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
519      throws ReplicationException {
520    if (excludeTableCfs == null) {
521      throw new ReplicationException("exclude tableCfs is null");
522    }
523    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
524    if (preExcludeTableCfs == null) {
525      throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
526    }
527    Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
528    for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
529      TableName table = entry.getKey();
530      Collection<String> removeCfs = entry.getValue();
531      if (newExcludeTableCfs.containsKey(table)) {
532        List<String> cfs = newExcludeTableCfs.get(table);
533        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
534          newExcludeTableCfs.remove(table);
535        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
536          Set<String> cfSet = new HashSet<String>(cfs);
537          cfSet.removeAll(removeCfs);
538          if (cfSet.isEmpty()) {
539            newExcludeTableCfs.remove(table);
540          } else {
541            newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
542          }
543        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
544          throw new ReplicationException("Cannot remove cf of table: " + table
545              + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
546        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
547          throw new ReplicationException("Cannot remove table: " + table
548              + " which has specified cfs from exclude-table-cfs config in peer: " + id);
549        }
550      } else {
551        throw new ReplicationException(
552            "No table: " + table + " in exclude-table-cfs config of peer: " + id);
553      }
554    }
555    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
556    builder.setExcludeTableCFsMap(newExcludeTableCfs);
557    return builder.build();
558  }
559
560  /**
561   * Returns the configuration needed to talk to the remote slave cluster.
562   * @param conf the base configuration
563   * @param peer the description of replication peer
564   * @return the configuration for the peer cluster, null if it was unable to get the configuration
565   * @throws IOException when create peer cluster configuration failed
566   */
567  public static Configuration getPeerClusterConfiguration(Configuration conf,
568      ReplicationPeerDescription peer) throws IOException {
569    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
570    Configuration otherConf;
571    try {
572      otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
573    } catch (IOException e) {
574      throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
575    }
576
577    if (!peerConfig.getConfiguration().isEmpty()) {
578      CompoundConfiguration compound = new CompoundConfiguration();
579      compound.add(otherConf);
580      compound.addStringMap(peerConfig.getConfiguration());
581      return compound;
582    }
583
584    return otherConf;
585  }
586}