001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030import org.apache.commons.lang3.StringUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.CompoundConfiguration;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.exceptions.DeserializationException;
036import org.apache.hadoop.hbase.replication.ReplicationException;
037import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
039import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
040import org.apache.hadoop.hbase.replication.SyncReplicationState;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.apache.yetus.audience.InterfaceStability;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
048import org.apache.hbase.thirdparty.com.google.common.base.Strings;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
051import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
052
053import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
056
057/**
058 * Helper for TableCFs Operations.
059 */
060@InterfaceAudience.Private
061@InterfaceStability.Stable
062public final class ReplicationPeerConfigUtil {
063
064  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
065  public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
066    "hbase.replication.peer.base.config";
067
068  private ReplicationPeerConfigUtil() {
069  }
070
071  public static String convertToString(Set<String> namespaces) {
072    if (namespaces == null) {
073      return null;
074    }
075    return StringUtils.join(namespaces, ';');
076  }
077
078  /** convert map to TableCFs Object */
079  public static ReplicationProtos.TableCF[]
080    convert(Map<TableName, ? extends Collection<String>> tableCfs) {
081    if (tableCfs == null) {
082      return null;
083    }
084    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
085    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
086    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
087      tableCFBuilder.clear();
088      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
089      Collection<String> v = entry.getValue();
090      if (v != null && !v.isEmpty()) {
091        for (String value : entry.getValue()) {
092          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
093        }
094      }
095      tableCFList.add(tableCFBuilder.build());
096    }
097    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
098  }
099
100  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
101    if (tableCfs == null) {
102      return null;
103    }
104    return convert(convert(tableCfs));
105  }
106
107  /**
108   * Convert string to TableCFs Object. This is only for read TableCFs information from TableCF
109   * node. Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
110   */
111  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
112    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
113      return null;
114    }
115
116    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
117    List<String> tables = Splitter.on(';').splitToList(tableCFsConfig);
118    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.size());
119
120    for (String tab : tables) {
121      // 1 ignore empty table config
122      tab = tab.trim();
123      if (tab.length() == 0) {
124        continue;
125      }
126      // 2 split to "table" and "cf1,cf2"
127      // for each table: "table#cf1,cf2" or "table"
128      List<String> pair = Splitter.on(':').splitToList(tab);
129      if (pair.size() > 2) {
130        LOG.info("incorrect format:" + tableCFsConfig);
131        continue;
132      }
133      assert pair.size() > 0;
134      Iterator<String> i = pair.iterator();
135      String tabName = i.next().trim();
136      if (tabName.length() == 0) {
137        LOG.info("incorrect format:" + tableCFsConfig);
138        continue;
139      }
140
141      tableCFBuilder.clear();
142      // split namespace from tableName
143      String ns = "default";
144      String tName = tabName;
145      List<String> dbs = Splitter.on('.').splitToList(tabName);
146      if (dbs != null && dbs.size() == 2) {
147        Iterator<String> ii = dbs.iterator();
148        ns = ii.next();
149        tName = ii.next();
150      }
151      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
152
153      // 3 parse "cf1,cf2" part to List<cf>
154      if (i.hasNext()) {
155        List<String> cfsList = Splitter.on(',').splitToList(i.next());
156        for (String cf : cfsList) {
157          String cfName = cf.trim();
158          if (cfName.length() > 0) {
159            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
160          }
161        }
162      }
163      tableCFList.add(tableCFBuilder.build());
164    }
165    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
166  }
167
168  /**
169   * Convert TableCFs Object to String. Output String Format:
170   * ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
171   */
172  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
173    StringBuilder sb = new StringBuilder();
174    for (int i = 0, n = tableCFs.length; i < n; i++) {
175      ReplicationProtos.TableCF tableCF = tableCFs[i];
176      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
177      if (StringUtils.isNotEmpty(namespace)) {
178        sb.append(namespace).append(".")
179          .append(tableCF.getTableName().getQualifier().toStringUtf8()).append(":");
180      } else {
181        sb.append(tableCF.getTableName().toString()).append(":");
182      }
183      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
184        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
185      }
186      sb.deleteCharAt(sb.length() - 1).append(";");
187    }
188    if (sb.length() > 0) {
189      sb.deleteCharAt(sb.length() - 1);
190    }
191    return sb.toString();
192  }
193
194  /**
195   * Get TableCF in TableCFs, if not exist, return null.
196   */
197  public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
198    String table) {
199    for (int i = 0, n = tableCFs.length; i < n; i++) {
200      ReplicationProtos.TableCF tableCF = tableCFs[i];
201      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
202        return tableCF;
203      }
204    }
205    return null;
206  }
207
208  /**
209   * Parse bytes into TableCFs. It is used for backward compatibility. Old format bytes have no
210   * PB_MAGIC Header
211   */
212  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
213    if (bytes == null) {
214      return null;
215    }
216    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
217  }
218
219  /**
220   * Convert tableCFs string into Map.
221   */
222  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
223    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
224    return convert2Map(tableCFs);
225  }
226
227  /**
228   * Convert tableCFs Object to Map.
229   */
230  public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
231    if (tableCFs == null || tableCFs.length == 0) {
232      return null;
233    }
234    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
235    for (int i = 0, n = tableCFs.length; i < n; i++) {
236      ReplicationProtos.TableCF tableCF = tableCFs[i];
237      List<String> families = new ArrayList<>();
238      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
239        families.add(tableCF.getFamilies(j).toStringUtf8());
240      }
241      if (families.size() > 0) {
242        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
243      } else {
244        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
245      }
246    }
247
248    return tableCFsMap;
249  }
250
251  /**
252   * Parse the serialized representation of a peer configuration.
253   * @param bytes Content of a peer znode.
254   * @return ClusterKey parsed from the passed bytes.
255   * @throws DeserializationException deserialization exception
256   */
257  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
258    throws DeserializationException {
259    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
260      int pbLen = ProtobufUtil.lengthOfPBMagic();
261      ReplicationProtos.ReplicationPeer.Builder builder =
262        ReplicationProtos.ReplicationPeer.newBuilder();
263      ReplicationProtos.ReplicationPeer peer;
264      try {
265        ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
266        peer = builder.build();
267      } catch (IOException e) {
268        throw new DeserializationException(e);
269      }
270      return convert(peer);
271    } else {
272      if (bytes == null || bytes.length <= 0) {
273        throw new DeserializationException("Bytes to deserialize should not be empty.");
274      }
275      return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
276    }
277  }
278
279  public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
280    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
281    if (peer.hasClusterkey()) {
282      builder.setClusterKey(peer.getClusterkey());
283    }
284    if (peer.hasReplicationEndpointImpl()) {
285      builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
286    }
287
288    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
289      builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
290    }
291
292    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
293      builder.putConfiguration(pair.getName(), pair.getValue());
294    }
295
296    Map<TableName, List<String>> tableCFsMap = convert2Map(
297      peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
298    if (tableCFsMap != null) {
299      builder.setTableCFsMap(tableCFsMap);
300    }
301
302    List<ByteString> namespacesList = peer.getNamespacesList();
303    if (namespacesList != null && namespacesList.size() != 0) {
304      builder.setNamespaces(
305        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
306    }
307
308    if (peer.hasBandwidth()) {
309      builder.setBandwidth(peer.getBandwidth());
310    }
311
312    if (peer.hasReplicateAll()) {
313      builder.setReplicateAllUserTables(peer.getReplicateAll());
314    }
315
316    if (peer.hasSerial()) {
317      builder.setSerial(peer.getSerial());
318    }
319
320    Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
321      .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
322    if (excludeTableCFsMap != null) {
323      builder.setExcludeTableCFsMap(excludeTableCFsMap);
324    }
325
326    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
327    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
328      builder.setExcludeNamespaces(
329        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
330    }
331
332    if (peer.hasRemoteWALDir()) {
333      builder.setRemoteWALDir(peer.getRemoteWALDir());
334    }
335    return builder.build();
336  }
337
338  public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
339    ReplicationProtos.ReplicationPeer.Builder builder =
340      ReplicationProtos.ReplicationPeer.newBuilder();
341    // we used to set cluster key as required so here we must always set it, until we can make sure
342    // that no one uses the old proto file.
343    builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
344    if (peerConfig.getReplicationEndpointImpl() != null) {
345      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
346    }
347
348    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
349      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
350        .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
351        .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())).build());
352    }
353
354    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
355      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry.getKey())
356        .setValue(entry.getValue()).build());
357    }
358
359    ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
360    if (tableCFs != null) {
361      for (int i = 0; i < tableCFs.length; i++) {
362        builder.addTableCfs(tableCFs[i]);
363      }
364    }
365    Set<String> namespaces = peerConfig.getNamespaces();
366    if (namespaces != null) {
367      for (String namespace : namespaces) {
368        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
369      }
370    }
371
372    builder.setBandwidth(peerConfig.getBandwidth());
373    builder.setReplicateAll(peerConfig.replicateAllUserTables());
374    builder.setSerial(peerConfig.isSerial());
375
376    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
377    if (excludeTableCFs != null) {
378      for (int i = 0; i < excludeTableCFs.length; i++) {
379        builder.addExcludeTableCfs(excludeTableCFs[i]);
380      }
381    }
382    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
383    if (excludeNamespaces != null) {
384      for (String namespace : excludeNamespaces) {
385        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
386      }
387    }
388
389    if (peerConfig.getRemoteWALDir() != null) {
390      builder.setRemoteWALDir(peerConfig.getRemoteWALDir());
391    }
392    return builder.build();
393  }
394
395  /**
396   * Returns Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
397   * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
398   * /hbase/replication/peers/PEER_ID
399   */
400  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
401    byte[] bytes = convert(peerConfig).toByteArray();
402    return ProtobufUtil.prependPBMagic(bytes);
403  }
404
405  public static ReplicationPeerDescription
406    toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) {
407    boolean enabled =
408      ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState();
409    ReplicationPeerConfig config = convert(desc.getConfig());
410    return new ReplicationPeerDescription(desc.getId(), enabled, config,
411      toSyncReplicationState(desc.getSyncReplicationState()));
412  }
413
414  public static ReplicationProtos.ReplicationPeerDescription
415    toProtoReplicationPeerDescription(ReplicationPeerDescription desc) {
416    ReplicationProtos.ReplicationPeerDescription.Builder builder =
417      ReplicationProtos.ReplicationPeerDescription.newBuilder();
418    builder.setId(desc.getPeerId());
419
420    ReplicationProtos.ReplicationState.Builder stateBuilder =
421      ReplicationProtos.ReplicationState.newBuilder();
422    stateBuilder.setState(desc.isEnabled()
423      ? ReplicationProtos.ReplicationState.State.ENABLED
424      : ReplicationProtos.ReplicationState.State.DISABLED);
425    builder.setState(stateBuilder.build());
426
427    builder.setConfig(convert(desc.getPeerConfig()));
428    builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState()));
429
430    return builder.build();
431  }
432
433  public static ReplicationProtos.SyncReplicationState
434    toSyncReplicationState(SyncReplicationState state) {
435    ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder =
436      ReplicationProtos.SyncReplicationState.newBuilder();
437    syncReplicationStateBuilder
438      .setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal()));
439    return syncReplicationStateBuilder.build();
440  }
441
442  public static SyncReplicationState
443    toSyncReplicationState(ReplicationProtos.SyncReplicationState state) {
444    return SyncReplicationState.valueOf(state.getState().getNumber());
445  }
446
447  public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
448    Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
449    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
450    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
451    if (preTableCfs == null) {
452      builder.setTableCFsMap(tableCfs);
453    } else {
454      builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
455    }
456    return builder.build();
457  }
458
459  /**
460   * Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This
461   * merges the user supplied peer configuration
462   * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as
463   * property hbase.replication.peer.base.configs in hbase configuration. Expected format for this
464   * hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing
465   * key-value from peer config.
466   * @param conf Configuration
467   * @return ReplicationPeerConfig containing updated configs.
468   */
469  public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf,
470    ReplicationPeerConfig receivedPeerConfig) {
471    ReplicationPeerConfigBuilder copiedPeerConfigBuilder =
472      ReplicationPeerConfig.newBuilder(receivedPeerConfig);
473
474    Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
475    String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
476    if (basePeerConfigs.length() != 0) {
477      Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
478        .withKeyValueSeparator("=").split(basePeerConfigs);
479      for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) {
480        String configName = entry.getKey();
481        String configValue = entry.getValue();
482        // If the config is provided with empty value, for eg. k1="",
483        // we remove it from peer config. Providing config with empty value
484        // is required so that it doesn't remove any other config unknowingly.
485        if (Strings.isNullOrEmpty(configValue)) {
486          copiedPeerConfigBuilder.removeConfiguration(configName);
487        } else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) {
488          // update the configuration if exact config and value doesn't exists
489          copiedPeerConfigBuilder.putConfiguration(configName, configValue);
490        }
491      }
492    }
493
494    return copiedPeerConfigBuilder.build();
495  }
496
497  public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
498    Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
499    throws ReplicationException {
500    if (excludeTableCfs == null) {
501      throw new ReplicationException("exclude tableCfs is null");
502    }
503    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
504    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
505    if (preExcludeTableCfs == null) {
506      builder.setExcludeTableCFsMap(excludeTableCfs);
507    } else {
508      builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
509    }
510    return builder.build();
511  }
512
513  private static Map<TableName, List<String>>
514    mergeTableCFs(Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
515    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
516    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
517      TableName table = entry.getKey();
518      Collection<String> appendCfs = entry.getValue();
519      if (newTableCfs.containsKey(table)) {
520        List<String> cfs = newTableCfs.get(table);
521        if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
522          newTableCfs.put(table, null);
523        } else {
524          Set<String> cfSet = new HashSet<String>(cfs);
525          cfSet.addAll(appendCfs);
526          newTableCfs.put(table, Lists.newArrayList(cfSet));
527        }
528      } else {
529        if (appendCfs == null || appendCfs.isEmpty()) {
530          newTableCfs.put(table, null);
531        } else {
532          newTableCfs.put(table, Lists.newArrayList(appendCfs));
533        }
534      }
535    }
536    return newTableCfs;
537  }
538
539  private static Map<TableName, List<String>>
540    copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
541    Map<TableName, List<String>> newTableCfs = new HashMap<>();
542    preTableCfs.forEach(
543      (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
544    return newTableCfs;
545  }
546
547  public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
548    Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, String id)
549    throws ReplicationException {
550    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
551    if (preTableCfs == null) {
552      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
553    }
554    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
555    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
556      TableName table = entry.getKey();
557      Collection<String> removeCfs = entry.getValue();
558      if (newTableCfs.containsKey(table)) {
559        List<String> cfs = newTableCfs.get(table);
560        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
561          newTableCfs.remove(table);
562        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
563          Set<String> cfSet = new HashSet<String>(cfs);
564          cfSet.removeAll(removeCfs);
565          if (cfSet.isEmpty()) {
566            newTableCfs.remove(table);
567          } else {
568            newTableCfs.put(table, Lists.newArrayList(cfSet));
569          }
570        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
571          throw new ReplicationException("Cannot remove cf of table: " + table
572            + " which doesn't specify cfs from table-cfs config in peer: " + id);
573        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
574          throw new ReplicationException("Cannot remove table: " + table
575            + " which has specified cfs from table-cfs config in peer: " + id);
576        }
577      } else {
578        throw new ReplicationException(
579          "No table: " + table + " in table-cfs config of peer: " + id);
580      }
581    }
582    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
583    builder.setTableCFsMap(newTableCfs);
584    return builder.build();
585  }
586
587  public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
588    Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
589    throws ReplicationException {
590    if (excludeTableCfs == null) {
591      throw new ReplicationException("exclude tableCfs is null");
592    }
593    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
594    if (preExcludeTableCfs == null) {
595      throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
596    }
597    Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
598    for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
599      TableName table = entry.getKey();
600      Collection<String> removeCfs = entry.getValue();
601      if (newExcludeTableCfs.containsKey(table)) {
602        List<String> cfs = newExcludeTableCfs.get(table);
603        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
604          newExcludeTableCfs.remove(table);
605        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
606          Set<String> cfSet = new HashSet<String>(cfs);
607          cfSet.removeAll(removeCfs);
608          if (cfSet.isEmpty()) {
609            newExcludeTableCfs.remove(table);
610          } else {
611            newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
612          }
613        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
614          throw new ReplicationException("Cannot remove cf of table: " + table
615            + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
616        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
617          throw new ReplicationException("Cannot remove table: " + table
618            + " which has specified cfs from exclude-table-cfs config in peer: " + id);
619        }
620      } else {
621        throw new ReplicationException(
622          "No table: " + table + " in exclude-table-cfs config of peer: " + id);
623      }
624    }
625    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
626    builder.setExcludeTableCFsMap(newExcludeTableCfs);
627    return builder.build();
628  }
629
630  /**
631   * Returns the configuration needed to talk to the remote slave cluster.
632   * @param conf the base configuration
633   * @param peer the description of replication peer
634   * @return the configuration for the peer cluster, null if it was unable to get the configuration
635   * @throws IOException when create peer cluster configuration failed
636   */
637  public static Configuration getPeerClusterConfiguration(Configuration conf,
638    ReplicationPeerDescription peer) throws IOException {
639    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
640    Configuration otherConf;
641    try {
642      otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
643    } catch (IOException e) {
644      throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
645    }
646
647    if (!peerConfig.getConfiguration().isEmpty()) {
648      CompoundConfiguration compound = new CompoundConfiguration();
649      compound.add(otherConf);
650      compound.addStringMap(peerConfig.getConfiguration());
651      return compound;
652    }
653    return otherConf;
654  }
655}