001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client.replication;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CompoundConfiguration;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.replication.ReplicationException;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
049import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
050import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
053
054/**
055 * Helper for TableCFs Operations.
056 */
057@InterfaceAudience.Private
058@InterfaceStability.Stable
059public final class ReplicationPeerConfigUtil {
060
061  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
062
063  private ReplicationPeerConfigUtil() {}
064
065  public static String convertToString(Set<String> namespaces) {
066    if (namespaces == null) {
067      return null;
068    }
069    return StringUtils.join(namespaces, ';');
070  }
071
072  /** convert map to TableCFs Object */
073  public static ReplicationProtos.TableCF[] convert(
074      Map<TableName, ? extends Collection<String>> tableCfs) {
075    if (tableCfs == null) {
076      return null;
077    }
078    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
079    ReplicationProtos.TableCF.Builder tableCFBuilder =  ReplicationProtos.TableCF.newBuilder();
080    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
081      tableCFBuilder.clear();
082      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
083      Collection<String> v = entry.getValue();
084      if (v != null && !v.isEmpty()) {
085        for (String value : entry.getValue()) {
086          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
087        }
088      }
089      tableCFList.add(tableCFBuilder.build());
090    }
091    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
092  }
093
094  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
095    if (tableCfs == null) {
096      return null;
097    }
098    return convert(convert(tableCfs));
099  }
100
101  /**
102   *  Convert string to TableCFs Object.
103   *  This is only for read TableCFs information from TableCF node.
104   *  Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
105   * */
106  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
107    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
108      return null;
109    }
110
111    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
112    String[] tables = tableCFsConfig.split(";");
113    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length);
114
115    for (String tab : tables) {
116      // 1 ignore empty table config
117      tab = tab.trim();
118      if (tab.length() == 0) {
119        continue;
120      }
121      // 2 split to "table" and "cf1,cf2"
122      //   for each table: "table#cf1,cf2" or "table"
123      String[] pair = tab.split(":");
124      String tabName = pair[0].trim();
125      if (pair.length > 2 || tabName.length() == 0) {
126        LOG.info("incorrect format:" + tableCFsConfig);
127        continue;
128      }
129
130      tableCFBuilder.clear();
131      // split namespace from tableName
132      String ns = "default";
133      String tName = tabName;
134      String[] dbs = tabName.split("\\.");
135      if (dbs != null && dbs.length == 2) {
136        ns = dbs[0];
137        tName = dbs[1];
138      }
139      tableCFBuilder.setTableName(
140        ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
141
142      // 3 parse "cf1,cf2" part to List<cf>
143      if (pair.length == 2) {
144        String[] cfsList = pair[1].split(",");
145        for (String cf : cfsList) {
146          String cfName = cf.trim();
147          if (cfName.length() > 0) {
148            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
149          }
150        }
151      }
152      tableCFList.add(tableCFBuilder.build());
153    }
154    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
155  }
156
157  /**
158   *  Convert TableCFs Object to String.
159   *  Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
160   * */
161  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
162    StringBuilder sb = new StringBuilder();
163    for (int i = 0, n = tableCFs.length; i < n; i++) {
164      ReplicationProtos.TableCF tableCF = tableCFs[i];
165      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
166      if (StringUtils.isNotEmpty(namespace)) {
167        sb.append(namespace).append(".").
168            append(tableCF.getTableName().getQualifier().toStringUtf8())
169            .append(":");
170      } else {
171        sb.append(tableCF.getTableName().toString()).append(":");
172      }
173      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
174        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
175      }
176      sb.deleteCharAt(sb.length() - 1).append(";");
177    }
178    if (sb.length() > 0) {
179      sb.deleteCharAt(sb.length() - 1);
180    }
181    return sb.toString();
182  }
183
184  /**
185   *  Get TableCF in TableCFs, if not exist, return null.
186   * */
187  public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
188                                           String table) {
189    for (int i = 0, n = tableCFs.length; i < n; i++) {
190      ReplicationProtos.TableCF tableCF = tableCFs[i];
191      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
192        return tableCF;
193      }
194    }
195    return null;
196  }
197
198  /**
199   *  Parse bytes into TableCFs.
200   *  It is used for backward compatibility.
201   *  Old format bytes have no PB_MAGIC Header
202   * */
203  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
204    if (bytes == null) {
205      return null;
206    }
207    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
208  }
209
210  /**
211   *  Convert tableCFs string into Map.
212   * */
213  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
214    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
215    return convert2Map(tableCFs);
216  }
217
218  /**
219   *  Convert tableCFs Object to Map.
220   * */
221  public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
222    if (tableCFs == null || tableCFs.length == 0) {
223      return null;
224    }
225    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
226    for (int i = 0, n = tableCFs.length; i < n; i++) {
227      ReplicationProtos.TableCF tableCF = tableCFs[i];
228      List<String> families = new ArrayList<>();
229      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
230        families.add(tableCF.getFamilies(j).toStringUtf8());
231      }
232      if (families.size() > 0) {
233        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
234      } else {
235        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
236      }
237    }
238
239    return tableCFsMap;
240  }
241
242  /**
243   * @param bytes Content of a peer znode.
244   * @return ClusterKey parsed from the passed bytes.
245   * @throws DeserializationException
246   */
247  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
248      throws DeserializationException {
249    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
250      int pblen = ProtobufUtil.lengthOfPBMagic();
251      ReplicationProtos.ReplicationPeer.Builder builder =
252          ReplicationProtos.ReplicationPeer.newBuilder();
253      ReplicationProtos.ReplicationPeer peer;
254      try {
255        ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
256        peer = builder.build();
257      } catch (IOException e) {
258        throw new DeserializationException(e);
259      }
260      return convert(peer);
261    } else {
262      if (bytes.length > 0) {
263        return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
264      }
265      return ReplicationPeerConfig.newBuilder().setClusterKey("").build();
266    }
267  }
268
269  public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
270    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
271    if (peer.hasClusterkey()) {
272      builder.setClusterKey(peer.getClusterkey());
273    }
274    if (peer.hasReplicationEndpointImpl()) {
275      builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
276    }
277
278    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
279      builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
280    }
281
282    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
283      builder.putConfiguration(pair.getName(), pair.getValue());
284    }
285
286    Map<TableName, List<String>> tableCFsMap = convert2Map(
287      peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
288    if (tableCFsMap != null) {
289      builder.setTableCFsMap(tableCFsMap);
290    }
291
292    List<ByteString> namespacesList = peer.getNamespacesList();
293    if (namespacesList != null && namespacesList.size() != 0) {
294      builder.setNamespaces(
295        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
296    }
297
298    if (peer.hasBandwidth()) {
299      builder.setBandwidth(peer.getBandwidth());
300    }
301
302    if (peer.hasReplicateAll()) {
303      builder.setReplicateAllUserTables(peer.getReplicateAll());
304    }
305
306    Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
307        .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
308    if (excludeTableCFsMap != null) {
309      builder.setExcludeTableCFsMap(excludeTableCFsMap);
310    }
311
312    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
313    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
314      builder.setExcludeNamespaces(
315        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
316    }
317
318    return builder.build();
319  }
320
321  public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
322    ReplicationProtos.ReplicationPeer.Builder builder =
323        ReplicationProtos.ReplicationPeer.newBuilder();
324    if (peerConfig.getClusterKey() != null) {
325      builder.setClusterkey(peerConfig.getClusterKey());
326    }
327    if (peerConfig.getReplicationEndpointImpl() != null) {
328      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
329    }
330
331    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
332      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
333          .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
334          .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue()))
335          .build());
336    }
337
338    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
339      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
340          .setName(entry.getKey())
341          .setValue(entry.getValue())
342          .build());
343    }
344
345    ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
346    if (tableCFs != null) {
347      for (int i = 0; i < tableCFs.length; i++) {
348        builder.addTableCfs(tableCFs[i]);
349      }
350    }
351    Set<String> namespaces = peerConfig.getNamespaces();
352    if (namespaces != null) {
353      for (String namespace : namespaces) {
354        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
355      }
356    }
357
358    builder.setBandwidth(peerConfig.getBandwidth());
359    builder.setReplicateAll(peerConfig.replicateAllUserTables());
360
361    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
362    if (excludeTableCFs != null) {
363      for (int i = 0; i < excludeTableCFs.length; i++) {
364        builder.addExcludeTableCfs(excludeTableCFs[i]);
365      }
366    }
367    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
368    if (excludeNamespaces != null) {
369      for (String namespace : excludeNamespaces) {
370        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
371      }
372    }
373
374    return builder.build();
375  }
376
377  /**
378   * @param peerConfig
379   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
380   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
381   *         /hbase/replication/peers/PEER_ID
382   */
383  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
384    byte[] bytes = convert(peerConfig).toByteArray();
385    return ProtobufUtil.prependPBMagic(bytes);
386  }
387
388  public static ReplicationPeerDescription toReplicationPeerDescription(
389      ReplicationProtos.ReplicationPeerDescription desc) {
390    boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState()
391        .getState();
392    ReplicationPeerConfig config = convert(desc.getConfig());
393    return new ReplicationPeerDescription(desc.getId(), enabled, config);
394  }
395
396  public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
397      ReplicationPeerDescription desc) {
398    ReplicationProtos.ReplicationPeerDescription.Builder builder =
399        ReplicationProtos.ReplicationPeerDescription.newBuilder();
400    builder.setId(desc.getPeerId());
401    ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
402        .newBuilder();
403    stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED
404        : ReplicationProtos.ReplicationState.State.DISABLED);
405    builder.setState(stateBuilder.build());
406    builder.setConfig(convert(desc.getPeerConfig()));
407    return builder.build();
408  }
409
410  public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
411      Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
412    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
413    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
414    if (preTableCfs == null) {
415      builder.setTableCFsMap(tableCfs);
416    } else {
417      builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
418    }
419    return builder.build();
420  }
421
422  public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
423      Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
424      throws ReplicationException {
425    if (excludeTableCfs == null) {
426      throw new ReplicationException("exclude tableCfs is null");
427    }
428    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
429    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
430    if (preExcludeTableCfs == null) {
431      builder.setExcludeTableCFsMap(excludeTableCfs);
432    } else {
433      builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
434    }
435    return builder.build();
436  }
437
438  private static Map<TableName, List<String>> mergeTableCFs(
439      Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
440    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
441    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
442      TableName table = entry.getKey();
443      Collection<String> appendCfs = entry.getValue();
444      if (newTableCfs.containsKey(table)) {
445        List<String> cfs = newTableCfs.get(table);
446        if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
447          newTableCfs.put(table, null);
448        } else {
449          Set<String> cfSet = new HashSet<String>(cfs);
450          cfSet.addAll(appendCfs);
451          newTableCfs.put(table, Lists.newArrayList(cfSet));
452        }
453      } else {
454        if (appendCfs == null || appendCfs.isEmpty()) {
455          newTableCfs.put(table, null);
456        } else {
457          newTableCfs.put(table, Lists.newArrayList(appendCfs));
458        }
459      }
460    }
461    return newTableCfs;
462  }
463
464  private static Map<TableName, List<String>>
465      copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
466    Map<TableName, List<String>> newTableCfs = new HashMap<>();
467    preTableCfs.forEach(
468      (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
469    return newTableCfs;
470  }
471
472  public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
473      Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig,
474      String id) throws ReplicationException {
475    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
476    if (preTableCfs == null) {
477      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
478    }
479    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
480    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
481      TableName table = entry.getKey();
482      Collection<String> removeCfs = entry.getValue();
483      if (newTableCfs.containsKey(table)) {
484        List<String> cfs = newTableCfs.get(table);
485        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
486          newTableCfs.remove(table);
487        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
488          Set<String> cfSet = new HashSet<String>(cfs);
489          cfSet.removeAll(removeCfs);
490          if (cfSet.isEmpty()) {
491            newTableCfs.remove(table);
492          } else {
493            newTableCfs.put(table, Lists.newArrayList(cfSet));
494          }
495        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
496          throw new ReplicationException("Cannot remove cf of table: " + table
497              + " which doesn't specify cfs from table-cfs config in peer: " + id);
498        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
499          throw new ReplicationException("Cannot remove table: " + table
500              + " which has specified cfs from table-cfs config in peer: " + id);
501        }
502      } else {
503        throw new ReplicationException(
504            "No table: " + table + " in table-cfs config of peer: " + id);
505      }
506    }
507    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
508    builder.setTableCFsMap(newTableCfs);
509    return builder.build();
510  }
511
512  public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
513      Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
514      throws ReplicationException {
515    if (excludeTableCfs == null) {
516      throw new ReplicationException("exclude tableCfs is null");
517    }
518    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
519    if (preExcludeTableCfs == null) {
520      throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
521    }
522    Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
523    for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
524      TableName table = entry.getKey();
525      Collection<String> removeCfs = entry.getValue();
526      if (newExcludeTableCfs.containsKey(table)) {
527        List<String> cfs = newExcludeTableCfs.get(table);
528        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
529          newExcludeTableCfs.remove(table);
530        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
531          Set<String> cfSet = new HashSet<String>(cfs);
532          cfSet.removeAll(removeCfs);
533          if (cfSet.isEmpty()) {
534            newExcludeTableCfs.remove(table);
535          } else {
536            newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
537          }
538        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
539          throw new ReplicationException("Cannot remove cf of table: " + table
540              + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
541        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
542          throw new ReplicationException("Cannot remove table: " + table
543              + " which has specified cfs from exclude-table-cfs config in peer: " + id);
544        }
545      } else {
546        throw new ReplicationException(
547            "No table: " + table + " in exclude-table-cfs config of peer: " + id);
548      }
549    }
550    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
551    builder.setExcludeTableCFsMap(newExcludeTableCfs);
552    return builder.build();
553  }
554
555  /**
556   * Returns the configuration needed to talk to the remote slave cluster.
557   * @param conf the base configuration
558   * @param peer the description of replication peer
559   * @return the configuration for the peer cluster, null if it was unable to get the configuration
560   * @throws IOException when create peer cluster configuration failed
561   */
562  public static Configuration getPeerClusterConfiguration(Configuration conf,
563      ReplicationPeerDescription peer) throws IOException {
564    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
565    Configuration otherConf;
566    try {
567      otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
568    } catch (IOException e) {
569      throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
570    }
571
572    if (!peerConfig.getConfiguration().isEmpty()) {
573      CompoundConfiguration compound = new CompoundConfiguration();
574      compound.add(otherConf);
575      compound.addStringMap(peerConfig.getConfiguration());
576      return compound;
577    }
578
579    return otherConf;
580  }
581}