001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.client.replication;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030
031import org.apache.commons.lang3.StringUtils;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.CompoundConfiguration;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.replication.ReplicationException;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.apache.yetus.audience.InterfaceStability;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
049import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
050import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
051import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
054
055/**
056 * Helper for TableCFs Operations.
057 */
058@InterfaceAudience.Private
059@InterfaceStability.Stable
060public final class ReplicationPeerConfigUtil {
061
062  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
063  public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
064    "hbase.replication.peer.base.config";
065
066  private ReplicationPeerConfigUtil() {}
067
068  public static String convertToString(Set<String> namespaces) {
069    if (namespaces == null) {
070      return null;
071    }
072    return StringUtils.join(namespaces, ';');
073  }
074
075  /** convert map to TableCFs Object */
076  public static ReplicationProtos.TableCF[] convert(
077      Map<TableName, ? extends Collection<String>> tableCfs) {
078    if (tableCfs == null) {
079      return null;
080    }
081    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
082    ReplicationProtos.TableCF.Builder tableCFBuilder =  ReplicationProtos.TableCF.newBuilder();
083    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
084      tableCFBuilder.clear();
085      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
086      Collection<String> v = entry.getValue();
087      if (v != null && !v.isEmpty()) {
088        for (String value : entry.getValue()) {
089          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
090        }
091      }
092      tableCFList.add(tableCFBuilder.build());
093    }
094    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
095  }
096
097  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
098    if (tableCfs == null) {
099      return null;
100    }
101    return convert(convert(tableCfs));
102  }
103
104  /**
105   *  Convert string to TableCFs Object.
106   *  This is only for read TableCFs information from TableCF node.
107   *  Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
108   * */
109  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
110    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
111      return null;
112    }
113
114    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
115    String[] tables = tableCFsConfig.split(";");
116    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.length);
117
118    for (String tab : tables) {
119      // 1 ignore empty table config
120      tab = tab.trim();
121      if (tab.length() == 0) {
122        continue;
123      }
124      // 2 split to "table" and "cf1,cf2"
125      //   for each table: "table#cf1,cf2" or "table"
126      String[] pair = tab.split(":");
127      String tabName = pair[0].trim();
128      if (pair.length > 2 || tabName.length() == 0) {
129        LOG.info("incorrect format:" + tableCFsConfig);
130        continue;
131      }
132
133      tableCFBuilder.clear();
134      // split namespace from tableName
135      String ns = "default";
136      String tName = tabName;
137      String[] dbs = tabName.split("\\.");
138      if (dbs != null && dbs.length == 2) {
139        ns = dbs[0];
140        tName = dbs[1];
141      }
142      tableCFBuilder.setTableName(
143        ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
144
145      // 3 parse "cf1,cf2" part to List<cf>
146      if (pair.length == 2) {
147        String[] cfsList = pair[1].split(",");
148        for (String cf : cfsList) {
149          String cfName = cf.trim();
150          if (cfName.length() > 0) {
151            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
152          }
153        }
154      }
155      tableCFList.add(tableCFBuilder.build());
156    }
157    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
158  }
159
160  /**
161   *  Convert TableCFs Object to String.
162   *  Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
163   * */
164  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
165    StringBuilder sb = new StringBuilder();
166    for (int i = 0, n = tableCFs.length; i < n; i++) {
167      ReplicationProtos.TableCF tableCF = tableCFs[i];
168      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
169      if (StringUtils.isNotEmpty(namespace)) {
170        sb.append(namespace).append(".").
171            append(tableCF.getTableName().getQualifier().toStringUtf8())
172            .append(":");
173      } else {
174        sb.append(tableCF.getTableName().toString()).append(":");
175      }
176      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
177        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
178      }
179      sb.deleteCharAt(sb.length() - 1).append(";");
180    }
181    if (sb.length() > 0) {
182      sb.deleteCharAt(sb.length() - 1);
183    }
184    return sb.toString();
185  }
186
187  /**
188   *  Get TableCF in TableCFs, if not exist, return null.
189   * */
190  public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
191                                           String table) {
192    for (int i = 0, n = tableCFs.length; i < n; i++) {
193      ReplicationProtos.TableCF tableCF = tableCFs[i];
194      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
195        return tableCF;
196      }
197    }
198    return null;
199  }
200
201  /**
202   *  Parse bytes into TableCFs.
203   *  It is used for backward compatibility.
204   *  Old format bytes have no PB_MAGIC Header
205   * */
206  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
207    if (bytes == null) {
208      return null;
209    }
210    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
211  }
212
213  /**
214   *  Convert tableCFs string into Map.
215   * */
216  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
217    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
218    return convert2Map(tableCFs);
219  }
220
221  /**
222   *  Convert tableCFs Object to Map.
223   * */
224  public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
225    if (tableCFs == null || tableCFs.length == 0) {
226      return null;
227    }
228    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
229    for (int i = 0, n = tableCFs.length; i < n; i++) {
230      ReplicationProtos.TableCF tableCF = tableCFs[i];
231      List<String> families = new ArrayList<>();
232      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
233        families.add(tableCF.getFamilies(j).toStringUtf8());
234      }
235      if (families.size() > 0) {
236        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
237      } else {
238        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
239      }
240    }
241
242    return tableCFsMap;
243  }
244
245  /**
246   * @param bytes Content of a peer znode.
247   * @return ClusterKey parsed from the passed bytes.
248   * @throws DeserializationException
249   */
250  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
251      throws DeserializationException {
252    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
253      int pbLen = ProtobufUtil.lengthOfPBMagic();
254      ReplicationProtos.ReplicationPeer.Builder builder =
255          ReplicationProtos.ReplicationPeer.newBuilder();
256      ReplicationProtos.ReplicationPeer peer;
257      try {
258        ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
259        peer = builder.build();
260      } catch (IOException e) {
261        throw new DeserializationException(e);
262      }
263      return convert(peer);
264    } else {
265      if (bytes == null || bytes.length <= 0) {
266        throw new DeserializationException("Bytes to deserialize should not be empty.");
267      }
268      return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
269    }
270  }
271
272  public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
273    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
274    if (peer.hasClusterkey()) {
275      builder.setClusterKey(peer.getClusterkey());
276    }
277    if (peer.hasReplicationEndpointImpl()) {
278      builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
279    }
280
281    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
282      builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
283    }
284
285    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
286      builder.putConfiguration(pair.getName(), pair.getValue());
287    }
288
289    Map<TableName, List<String>> tableCFsMap = convert2Map(
290      peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
291    if (tableCFsMap != null) {
292      builder.setTableCFsMap(tableCFsMap);
293    }
294
295    List<ByteString> namespacesList = peer.getNamespacesList();
296    if (namespacesList != null && namespacesList.size() != 0) {
297      builder.setNamespaces(
298        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
299    }
300
301    if (peer.hasBandwidth()) {
302      builder.setBandwidth(peer.getBandwidth());
303    }
304
305    if (peer.hasReplicateAll()) {
306      builder.setReplicateAllUserTables(peer.getReplicateAll());
307    }
308
309    if (peer.hasSerial()) {
310      builder.setSerial(peer.getSerial());
311    }
312
313    Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
314        .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
315    if (excludeTableCFsMap != null) {
316      builder.setExcludeTableCFsMap(excludeTableCFsMap);
317    }
318
319    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
320    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
321      builder.setExcludeNamespaces(
322        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
323    }
324
325    return builder.build();
326  }
327
328  public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
329    ReplicationProtos.ReplicationPeer.Builder builder =
330        ReplicationProtos.ReplicationPeer.newBuilder();
331    // we used to set cluster key as required so here we must always set it, until we can make sure
332    // that no one uses the old proto file.
333    builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
334    if (peerConfig.getReplicationEndpointImpl() != null) {
335      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
336    }
337
338    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
339      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
340          .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
341          .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue()))
342          .build());
343    }
344
345    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
346      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
347          .setName(entry.getKey())
348          .setValue(entry.getValue())
349          .build());
350    }
351
352    ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
353    if (tableCFs != null) {
354      for (int i = 0; i < tableCFs.length; i++) {
355        builder.addTableCfs(tableCFs[i]);
356      }
357    }
358    Set<String> namespaces = peerConfig.getNamespaces();
359    if (namespaces != null) {
360      for (String namespace : namespaces) {
361        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
362      }
363    }
364
365    builder.setBandwidth(peerConfig.getBandwidth());
366    builder.setReplicateAll(peerConfig.replicateAllUserTables());
367    builder.setSerial(peerConfig.isSerial());
368
369    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
370    if (excludeTableCFs != null) {
371      for (int i = 0; i < excludeTableCFs.length; i++) {
372        builder.addExcludeTableCfs(excludeTableCFs[i]);
373      }
374    }
375    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
376    if (excludeNamespaces != null) {
377      for (String namespace : excludeNamespaces) {
378        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
379      }
380    }
381
382    return builder.build();
383  }
384
385  /**
386   * @param peerConfig
387   * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
388   *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
389   *         /hbase/replication/peers/PEER_ID
390   */
391  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
392    byte[] bytes = convert(peerConfig).toByteArray();
393    return ProtobufUtil.prependPBMagic(bytes);
394  }
395
396  public static ReplicationPeerDescription toReplicationPeerDescription(
397      ReplicationProtos.ReplicationPeerDescription desc) {
398    boolean enabled = ReplicationProtos.ReplicationState.State.ENABLED == desc.getState()
399        .getState();
400    ReplicationPeerConfig config = convert(desc.getConfig());
401    return new ReplicationPeerDescription(desc.getId(), enabled, config);
402  }
403
404  public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
405      ReplicationPeerDescription desc) {
406    ReplicationProtos.ReplicationPeerDescription.Builder builder =
407        ReplicationProtos.ReplicationPeerDescription.newBuilder();
408    builder.setId(desc.getPeerId());
409    ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
410        .newBuilder();
411    stateBuilder.setState(desc.isEnabled() ? ReplicationProtos.ReplicationState.State.ENABLED
412        : ReplicationProtos.ReplicationState.State.DISABLED);
413    builder.setState(stateBuilder.build());
414    builder.setConfig(convert(desc.getPeerConfig()));
415    return builder.build();
416  }
417
418  public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
419      Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
420    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
421    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
422    if (preTableCfs == null) {
423      builder.setTableCFsMap(tableCfs);
424    } else {
425      builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
426    }
427    return builder.build();
428  }
429
430  /**
431   * Helper method to add base peer configs from Configuration to ReplicationPeerConfig
432   * if not present in latter.
433   *
434   * This merges the user supplied peer configuration
435   * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs
436   * provided as property hbase.replication.peer.base.configs in hbase configuration.
437   * Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value
438   * of conf is retained if already present in ReplicationPeerConfig.
439   *
440   * @param conf Configuration
441   * @return ReplicationPeerConfig containing updated configs.
442   */
443  public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf,
444    ReplicationPeerConfig receivedPeerConfig) {
445    String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
446    ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.
447      newBuilder(receivedPeerConfig);
448    Map<String,String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
449
450    if (basePeerConfigs.length() != 0) {
451      Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
452        .withKeyValueSeparator("=").split(basePeerConfigs);
453      for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) {
454        String configName = entry.getKey();
455        String configValue = entry.getValue();
456        // Only override if base config does not exist in existing peer configs
457        if (!receivedPeerConfigMap.containsKey(configName)) {
458          copiedPeerConfigBuilder.putConfiguration(configName, configValue);
459        }
460      }
461    }
462    return copiedPeerConfigBuilder.build();
463  }
464
465  public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
466      Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
467      throws ReplicationException {
468    if (excludeTableCfs == null) {
469      throw new ReplicationException("exclude tableCfs is null");
470    }
471    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
472    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
473    if (preExcludeTableCfs == null) {
474      builder.setExcludeTableCFsMap(excludeTableCfs);
475    } else {
476      builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
477    }
478    return builder.build();
479  }
480
481  private static Map<TableName, List<String>> mergeTableCFs(
482      Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
483    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
484    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
485      TableName table = entry.getKey();
486      Collection<String> appendCfs = entry.getValue();
487      if (newTableCfs.containsKey(table)) {
488        List<String> cfs = newTableCfs.get(table);
489        if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
490          newTableCfs.put(table, null);
491        } else {
492          Set<String> cfSet = new HashSet<String>(cfs);
493          cfSet.addAll(appendCfs);
494          newTableCfs.put(table, Lists.newArrayList(cfSet));
495        }
496      } else {
497        if (appendCfs == null || appendCfs.isEmpty()) {
498          newTableCfs.put(table, null);
499        } else {
500          newTableCfs.put(table, Lists.newArrayList(appendCfs));
501        }
502      }
503    }
504    return newTableCfs;
505  }
506
507  private static Map<TableName, List<String>>
508      copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
509    Map<TableName, List<String>> newTableCfs = new HashMap<>();
510    preTableCfs.forEach(
511      (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
512    return newTableCfs;
513  }
514
515  public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
516      Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig,
517      String id) throws ReplicationException {
518    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
519    if (preTableCfs == null) {
520      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
521    }
522    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
523    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
524      TableName table = entry.getKey();
525      Collection<String> removeCfs = entry.getValue();
526      if (newTableCfs.containsKey(table)) {
527        List<String> cfs = newTableCfs.get(table);
528        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
529          newTableCfs.remove(table);
530        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
531          Set<String> cfSet = new HashSet<String>(cfs);
532          cfSet.removeAll(removeCfs);
533          if (cfSet.isEmpty()) {
534            newTableCfs.remove(table);
535          } else {
536            newTableCfs.put(table, Lists.newArrayList(cfSet));
537          }
538        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
539          throw new ReplicationException("Cannot remove cf of table: " + table
540              + " which doesn't specify cfs from table-cfs config in peer: " + id);
541        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
542          throw new ReplicationException("Cannot remove table: " + table
543              + " which has specified cfs from table-cfs config in peer: " + id);
544        }
545      } else {
546        throw new ReplicationException(
547            "No table: " + table + " in table-cfs config of peer: " + id);
548      }
549    }
550    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
551    builder.setTableCFsMap(newTableCfs);
552    return builder.build();
553  }
554
555  public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
556      Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
557      throws ReplicationException {
558    if (excludeTableCfs == null) {
559      throw new ReplicationException("exclude tableCfs is null");
560    }
561    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
562    if (preExcludeTableCfs == null) {
563      throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
564    }
565    Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
566    for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
567      TableName table = entry.getKey();
568      Collection<String> removeCfs = entry.getValue();
569      if (newExcludeTableCfs.containsKey(table)) {
570        List<String> cfs = newExcludeTableCfs.get(table);
571        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
572          newExcludeTableCfs.remove(table);
573        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
574          Set<String> cfSet = new HashSet<String>(cfs);
575          cfSet.removeAll(removeCfs);
576          if (cfSet.isEmpty()) {
577            newExcludeTableCfs.remove(table);
578          } else {
579            newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
580          }
581        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
582          throw new ReplicationException("Cannot remove cf of table: " + table
583              + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
584        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
585          throw new ReplicationException("Cannot remove table: " + table
586              + " which has specified cfs from exclude-table-cfs config in peer: " + id);
587        }
588      } else {
589        throw new ReplicationException(
590            "No table: " + table + " in exclude-table-cfs config of peer: " + id);
591      }
592    }
593    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
594    builder.setExcludeTableCFsMap(newExcludeTableCfs);
595    return builder.build();
596  }
597
598  /**
599   * Returns the configuration needed to talk to the remote slave cluster.
600   * @param conf the base configuration
601   * @param peer the description of replication peer
602   * @return the configuration for the peer cluster, null if it was unable to get the configuration
603   * @throws IOException when create peer cluster configuration failed
604   */
605  public static Configuration getPeerClusterConfiguration(Configuration conf,
606      ReplicationPeerDescription peer) throws IOException {
607    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
608    Configuration otherConf;
609    try {
610      otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
611    } catch (IOException e) {
612      throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
613    }
614
615    if (!peerConfig.getConfiguration().isEmpty()) {
616      CompoundConfiguration compound = new CompoundConfiguration();
617      compound.add(otherConf);
618      compound.addStringMap(peerConfig.getConfiguration());
619      return compound;
620    }
621    return otherConf;
622  }
623}