001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import java.util.stream.Collectors;
030import org.apache.commons.lang3.StringUtils;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.CompoundConfiguration;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.ConnectionRegistryFactory;
036import org.apache.hadoop.hbase.exceptions.DeserializationException;
037import org.apache.hadoop.hbase.replication.ReplicationException;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.hbase.replication.SyncReplicationState;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.apache.yetus.audience.InterfaceAudience;
044import org.apache.yetus.audience.InterfaceStability;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
049import org.apache.hbase.thirdparty.com.google.common.base.Strings;
050import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
051import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
052import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
053
054import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
057
058/**
059 * Helper for TableCFs Operations.
060 */
061@InterfaceAudience.Private
062@InterfaceStability.Stable
063public final class ReplicationPeerConfigUtil {
064
065  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
066  public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
067    "hbase.replication.peer.base.config";
068
069  private ReplicationPeerConfigUtil() {
070  }
071
072  public static String convertToString(Set<String> namespaces) {
073    if (namespaces == null) {
074      return null;
075    }
076    return StringUtils.join(namespaces, ';');
077  }
078
079  /** convert map to TableCFs Object */
080  public static ReplicationProtos.TableCF[]
081    convert(Map<TableName, ? extends Collection<String>> tableCfs) {
082    if (tableCfs == null) {
083      return null;
084    }
085    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tableCfs.entrySet().size());
086    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
087    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
088      tableCFBuilder.clear();
089      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
090      Collection<String> v = entry.getValue();
091      if (v != null && !v.isEmpty()) {
092        for (String value : entry.getValue()) {
093          tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
094        }
095      }
096      tableCFList.add(tableCFBuilder.build());
097    }
098    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
099  }
100
101  public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
102    if (tableCfs == null) {
103      return null;
104    }
105    return convert(convert(tableCfs));
106  }
107
108  /**
109   * Convert string to TableCFs Object. This is only for read TableCFs information from TableCF
110   * node. Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
111   */
112  public static ReplicationProtos.TableCF[] convert(String tableCFsConfig) {
113    if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
114      return null;
115    }
116
117    ReplicationProtos.TableCF.Builder tableCFBuilder = ReplicationProtos.TableCF.newBuilder();
118    List<String> tables = Splitter.on(';').splitToList(tableCFsConfig);
119    List<ReplicationProtos.TableCF> tableCFList = new ArrayList<>(tables.size());
120
121    for (String tab : tables) {
122      // 1 ignore empty table config
123      tab = tab.trim();
124      if (tab.length() == 0) {
125        continue;
126      }
127      // 2 split to "table" and "cf1,cf2"
128      // for each table: "table#cf1,cf2" or "table"
129      List<String> pair = Splitter.on(':').splitToList(tab);
130      if (pair.size() > 2) {
131        LOG.info("incorrect format:" + tableCFsConfig);
132        continue;
133      }
134      assert pair.size() > 0;
135      Iterator<String> i = pair.iterator();
136      String tabName = i.next().trim();
137      if (tabName.length() == 0) {
138        LOG.info("incorrect format:" + tableCFsConfig);
139        continue;
140      }
141
142      tableCFBuilder.clear();
143      // split namespace from tableName
144      String ns = "default";
145      String tName = tabName;
146      List<String> dbs = Splitter.on('.').splitToList(tabName);
147      if (dbs != null && dbs.size() == 2) {
148        Iterator<String> ii = dbs.iterator();
149        ns = ii.next();
150        tName = ii.next();
151      }
152      tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
153
154      // 3 parse "cf1,cf2" part to List<cf>
155      if (i.hasNext()) {
156        List<String> cfsList = Splitter.on(',').splitToList(i.next());
157        for (String cf : cfsList) {
158          String cfName = cf.trim();
159          if (cfName.length() > 0) {
160            tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
161          }
162        }
163      }
164      tableCFList.add(tableCFBuilder.build());
165    }
166    return tableCFList.toArray(new ReplicationProtos.TableCF[tableCFList.size()]);
167  }
168
169  /**
170   * Convert TableCFs Object to String. Output String Format:
171   * ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
172   */
173  public static String convert(ReplicationProtos.TableCF[] tableCFs) {
174    StringBuilder sb = new StringBuilder();
175    for (int i = 0, n = tableCFs.length; i < n; i++) {
176      ReplicationProtos.TableCF tableCF = tableCFs[i];
177      String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
178      if (StringUtils.isNotEmpty(namespace)) {
179        sb.append(namespace).append(".")
180          .append(tableCF.getTableName().getQualifier().toStringUtf8()).append(":");
181      } else {
182        sb.append(tableCF.getTableName().toString()).append(":");
183      }
184      for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
185        sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
186      }
187      sb.deleteCharAt(sb.length() - 1).append(";");
188    }
189    if (sb.length() > 0) {
190      sb.deleteCharAt(sb.length() - 1);
191    }
192    return sb.toString();
193  }
194
195  /**
196   * Get TableCF in TableCFs, if not exist, return null.
197   */
198  public static ReplicationProtos.TableCF getTableCF(ReplicationProtos.TableCF[] tableCFs,
199    String table) {
200    for (int i = 0, n = tableCFs.length; i < n; i++) {
201      ReplicationProtos.TableCF tableCF = tableCFs[i];
202      if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
203        return tableCF;
204      }
205    }
206    return null;
207  }
208
209  /**
210   * Parse bytes into TableCFs. It is used for backward compatibility. Old format bytes have no
211   * PB_MAGIC Header
212   */
213  public static ReplicationProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
214    if (bytes == null) {
215      return null;
216    }
217    return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
218  }
219
220  /**
221   * Convert tableCFs string into Map.
222   */
223  public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
224    ReplicationProtos.TableCF[] tableCFs = convert(tableCFsConfig);
225    return convert2Map(tableCFs);
226  }
227
228  /**
229   * Convert tableCFs Object to Map.
230   */
231  public static Map<TableName, List<String>> convert2Map(ReplicationProtos.TableCF[] tableCFs) {
232    if (tableCFs == null || tableCFs.length == 0) {
233      return null;
234    }
235    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
236    for (int i = 0, n = tableCFs.length; i < n; i++) {
237      ReplicationProtos.TableCF tableCF = tableCFs[i];
238      List<String> families = new ArrayList<>();
239      for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
240        families.add(tableCF.getFamilies(j).toStringUtf8());
241      }
242      if (families.size() > 0) {
243        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
244      } else {
245        tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
246      }
247    }
248
249    return tableCFsMap;
250  }
251
252  /**
253   * Parse the serialized representation of a peer configuration.
254   * @param bytes Content of a peer znode.
255   * @return ClusterKey parsed from the passed bytes.
256   * @throws DeserializationException deserialization exception
257   */
258  public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
259    throws DeserializationException {
260    if (ProtobufUtil.isPBMagicPrefix(bytes)) {
261      int pbLen = ProtobufUtil.lengthOfPBMagic();
262      ReplicationProtos.ReplicationPeer.Builder builder =
263        ReplicationProtos.ReplicationPeer.newBuilder();
264      ReplicationProtos.ReplicationPeer peer;
265      try {
266        ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
267        peer = builder.build();
268      } catch (IOException e) {
269        throw new DeserializationException(e);
270      }
271      return convert(peer);
272    } else {
273      if (bytes == null || bytes.length <= 0) {
274        throw new DeserializationException("Bytes to deserialize should not be empty.");
275      }
276      return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
277    }
278  }
279
280  public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
281    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
282    if (peer.hasClusterkey()) {
283      builder.setClusterKey(peer.getClusterkey());
284    }
285    if (peer.hasReplicationEndpointImpl()) {
286      builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
287    }
288
289    for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
290      builder.putPeerData(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
291    }
292
293    for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
294      builder.putConfiguration(pair.getName(), pair.getValue());
295    }
296
297    Map<TableName, List<String>> tableCFsMap = convert2Map(
298      peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
299    if (tableCFsMap != null) {
300      builder.setTableCFsMap(tableCFsMap);
301    }
302
303    List<ByteString> namespacesList = peer.getNamespacesList();
304    if (namespacesList != null && namespacesList.size() != 0) {
305      builder.setNamespaces(
306        namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
307    }
308
309    if (peer.hasBandwidth()) {
310      builder.setBandwidth(peer.getBandwidth());
311    }
312
313    if (peer.hasReplicateAll()) {
314      builder.setReplicateAllUserTables(peer.getReplicateAll());
315    }
316
317    if (peer.hasSerial()) {
318      builder.setSerial(peer.getSerial());
319    }
320
321    Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
322      .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
323    if (excludeTableCFsMap != null) {
324      builder.setExcludeTableCFsMap(excludeTableCFsMap);
325    }
326
327    List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
328    if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
329      builder.setExcludeNamespaces(
330        excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
331    }
332
333    if (peer.hasRemoteWALDir()) {
334      builder.setRemoteWALDir(peer.getRemoteWALDir());
335    }
336    return builder.build();
337  }
338
339  public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
340    ReplicationProtos.ReplicationPeer.Builder builder =
341      ReplicationProtos.ReplicationPeer.newBuilder();
342    // we used to set cluster key as required so here we must always set it, until we can make sure
343    // that no one uses the old proto file.
344    builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
345    if (peerConfig.getReplicationEndpointImpl() != null) {
346      builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
347    }
348
349    for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
350      builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
351        .setFirst(UnsafeByteOperations.unsafeWrap(entry.getKey()))
352        .setSecond(UnsafeByteOperations.unsafeWrap(entry.getValue())).build());
353    }
354
355    for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
356      builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder().setName(entry.getKey())
357        .setValue(entry.getValue()).build());
358    }
359
360    ReplicationProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
361    if (tableCFs != null) {
362      for (int i = 0; i < tableCFs.length; i++) {
363        builder.addTableCfs(tableCFs[i]);
364      }
365    }
366    Set<String> namespaces = peerConfig.getNamespaces();
367    if (namespaces != null) {
368      for (String namespace : namespaces) {
369        builder.addNamespaces(ByteString.copyFromUtf8(namespace));
370      }
371    }
372
373    builder.setBandwidth(peerConfig.getBandwidth());
374    builder.setReplicateAll(peerConfig.replicateAllUserTables());
375    builder.setSerial(peerConfig.isSerial());
376
377    ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
378    if (excludeTableCFs != null) {
379      for (int i = 0; i < excludeTableCFs.length; i++) {
380        builder.addExcludeTableCfs(excludeTableCFs[i]);
381      }
382    }
383    Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
384    if (excludeNamespaces != null) {
385      for (String namespace : excludeNamespaces) {
386        builder.addExcludeNamespaces(ByteString.copyFromUtf8(namespace));
387      }
388    }
389
390    if (peerConfig.getRemoteWALDir() != null) {
391      builder.setRemoteWALDir(peerConfig.getRemoteWALDir());
392    }
393    return builder.build();
394  }
395
396  /**
397   * Returns Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
398   * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
399   * /hbase/replication/peers/PEER_ID
400   */
401  public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
402    byte[] bytes = convert(peerConfig).toByteArray();
403    return ProtobufUtil.prependPBMagic(bytes);
404  }
405
406  public static ReplicationPeerDescription
407    toReplicationPeerDescription(ReplicationProtos.ReplicationPeerDescription desc) {
408    boolean enabled =
409      ReplicationProtos.ReplicationState.State.ENABLED == desc.getState().getState();
410    ReplicationPeerConfig config = convert(desc.getConfig());
411    return new ReplicationPeerDescription(desc.getId(), enabled, config,
412      toSyncReplicationState(desc.getSyncReplicationState()));
413  }
414
415  public static ReplicationProtos.ReplicationPeerDescription
416    toProtoReplicationPeerDescription(ReplicationPeerDescription desc) {
417    ReplicationProtos.ReplicationPeerDescription.Builder builder =
418      ReplicationProtos.ReplicationPeerDescription.newBuilder();
419    builder.setId(desc.getPeerId());
420
421    ReplicationProtos.ReplicationState.Builder stateBuilder =
422      ReplicationProtos.ReplicationState.newBuilder();
423    stateBuilder.setState(desc.isEnabled()
424      ? ReplicationProtos.ReplicationState.State.ENABLED
425      : ReplicationProtos.ReplicationState.State.DISABLED);
426    builder.setState(stateBuilder.build());
427
428    builder.setConfig(convert(desc.getPeerConfig()));
429    builder.setSyncReplicationState(toSyncReplicationState(desc.getSyncReplicationState()));
430
431    return builder.build();
432  }
433
434  public static ReplicationProtos.SyncReplicationState
435    toSyncReplicationState(SyncReplicationState state) {
436    ReplicationProtos.SyncReplicationState.Builder syncReplicationStateBuilder =
437      ReplicationProtos.SyncReplicationState.newBuilder();
438    syncReplicationStateBuilder
439      .setState(ReplicationProtos.SyncReplicationState.State.forNumber(state.ordinal()));
440    return syncReplicationStateBuilder.build();
441  }
442
443  public static SyncReplicationState
444    toSyncReplicationState(ReplicationProtos.SyncReplicationState state) {
445    return SyncReplicationState.valueOf(state.getState().getNumber());
446  }
447
448  public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
449    Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
450    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
451    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
452    if (preTableCfs == null) {
453      builder.setTableCFsMap(tableCfs);
454    } else {
455      builder.setTableCFsMap(mergeTableCFs(preTableCfs, tableCfs));
456    }
457    return builder.build();
458  }
459
460  /**
461   * Helper method to add/removev base peer configs from Configuration to ReplicationPeerConfig This
462   * merges the user supplied peer configuration
463   * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs provided as
464   * property hbase.replication.peer.base.configs in hbase configuration. Expected format for this
465   * hbase configuration is "k1=v1;k2=v2,v2_1;k3=""". If value is empty, it will remove the existing
466   * key-value from peer config.
467   * @param conf Configuration
468   * @return ReplicationPeerConfig containing updated configs.
469   */
470  public static ReplicationPeerConfig updateReplicationBasePeerConfigs(Configuration conf,
471    ReplicationPeerConfig receivedPeerConfig) {
472    ReplicationPeerConfigBuilder copiedPeerConfigBuilder =
473      ReplicationPeerConfig.newBuilder(receivedPeerConfig);
474
475    Map<String, String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
476    String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
477    if (basePeerConfigs.length() != 0) {
478      Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
479        .withKeyValueSeparator("=").split(basePeerConfigs);
480      for (Map.Entry<String, String> entry : basePeerConfigMap.entrySet()) {
481        String configName = entry.getKey();
482        String configValue = entry.getValue();
483        // If the config is provided with empty value, for eg. k1="",
484        // we remove it from peer config. Providing config with empty value
485        // is required so that it doesn't remove any other config unknowingly.
486        if (Strings.isNullOrEmpty(configValue)) {
487          copiedPeerConfigBuilder.removeConfiguration(configName);
488        } else if (!receivedPeerConfigMap.getOrDefault(configName, "").equals(configValue)) {
489          // update the configuration if exact config and value doesn't exists
490          copiedPeerConfigBuilder.putConfiguration(configName, configValue);
491        }
492      }
493    }
494
495    return copiedPeerConfigBuilder.build();
496  }
497
498  public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
499    Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
500    throws ReplicationException {
501    if (excludeTableCfs == null) {
502      throw new ReplicationException("exclude tableCfs is null");
503    }
504    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
505    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
506    if (preExcludeTableCfs == null) {
507      builder.setExcludeTableCFsMap(excludeTableCfs);
508    } else {
509      builder.setExcludeTableCFsMap(mergeTableCFs(preExcludeTableCfs, excludeTableCfs));
510    }
511    return builder.build();
512  }
513
514  private static Map<TableName, List<String>>
515    mergeTableCFs(Map<TableName, List<String>> preTableCfs, Map<TableName, List<String>> tableCfs) {
516    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
517    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
518      TableName table = entry.getKey();
519      Collection<String> appendCfs = entry.getValue();
520      if (newTableCfs.containsKey(table)) {
521        List<String> cfs = newTableCfs.get(table);
522        if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
523          newTableCfs.put(table, null);
524        } else {
525          Set<String> cfSet = new HashSet<String>(cfs);
526          cfSet.addAll(appendCfs);
527          newTableCfs.put(table, Lists.newArrayList(cfSet));
528        }
529      } else {
530        if (appendCfs == null || appendCfs.isEmpty()) {
531          newTableCfs.put(table, null);
532        } else {
533          newTableCfs.put(table, Lists.newArrayList(appendCfs));
534        }
535      }
536    }
537    return newTableCfs;
538  }
539
540  private static Map<TableName, List<String>>
541    copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
542    Map<TableName, List<String>> newTableCfs = new HashMap<>();
543    preTableCfs.forEach(
544      (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
545    return newTableCfs;
546  }
547
548  public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
549    Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig, String id)
550    throws ReplicationException {
551    Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
552    if (preTableCfs == null) {
553      throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
554    }
555    Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
556    for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
557      TableName table = entry.getKey();
558      Collection<String> removeCfs = entry.getValue();
559      if (newTableCfs.containsKey(table)) {
560        List<String> cfs = newTableCfs.get(table);
561        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
562          newTableCfs.remove(table);
563        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
564          Set<String> cfSet = new HashSet<String>(cfs);
565          cfSet.removeAll(removeCfs);
566          if (cfSet.isEmpty()) {
567            newTableCfs.remove(table);
568          } else {
569            newTableCfs.put(table, Lists.newArrayList(cfSet));
570          }
571        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
572          throw new ReplicationException("Cannot remove cf of table: " + table
573            + " which doesn't specify cfs from table-cfs config in peer: " + id);
574        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
575          throw new ReplicationException("Cannot remove table: " + table
576            + " which has specified cfs from table-cfs config in peer: " + id);
577        }
578      } else {
579        throw new ReplicationException(
580          "No table: " + table + " in table-cfs config of peer: " + id);
581      }
582    }
583    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
584    builder.setTableCFsMap(newTableCfs);
585    return builder.build();
586  }
587
588  public static ReplicationPeerConfig removeExcludeTableCFsFromReplicationPeerConfig(
589    Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig, String id)
590    throws ReplicationException {
591    if (excludeTableCfs == null) {
592      throw new ReplicationException("exclude tableCfs is null");
593    }
594    Map<TableName, List<String>> preExcludeTableCfs = peerConfig.getExcludeTableCFsMap();
595    if (preExcludeTableCfs == null) {
596      throw new ReplicationException("exclude-Table-Cfs for peer: " + id + " is null");
597    }
598    Map<TableName, List<String>> newExcludeTableCfs = copyTableCFsMap(preExcludeTableCfs);
599    for (Map.Entry<TableName, ? extends Collection<String>> entry : excludeTableCfs.entrySet()) {
600      TableName table = entry.getKey();
601      Collection<String> removeCfs = entry.getValue();
602      if (newExcludeTableCfs.containsKey(table)) {
603        List<String> cfs = newExcludeTableCfs.get(table);
604        if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
605          newExcludeTableCfs.remove(table);
606        } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
607          Set<String> cfSet = new HashSet<String>(cfs);
608          cfSet.removeAll(removeCfs);
609          if (cfSet.isEmpty()) {
610            newExcludeTableCfs.remove(table);
611          } else {
612            newExcludeTableCfs.put(table, Lists.newArrayList(cfSet));
613          }
614        } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
615          throw new ReplicationException("Cannot remove cf of table: " + table
616            + " which doesn't specify cfs from exclude-table-cfs config in peer: " + id);
617        } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
618          throw new ReplicationException("Cannot remove table: " + table
619            + " which has specified cfs from exclude-table-cfs config in peer: " + id);
620        }
621      } else {
622        throw new ReplicationException(
623          "No table: " + table + " in exclude-table-cfs config of peer: " + id);
624      }
625    }
626    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
627    builder.setExcludeTableCFsMap(newExcludeTableCfs);
628    return builder.build();
629  }
630
631  /**
632   * Returns the configuration needed to talk to the remote slave cluster.
633   * @param conf       the base configuration
634   * @param peerConfig the peer config of replication peer
635   * @return the configuration for the peer cluster, null if it was unable to get the configuration
636   * @throws IOException when create peer cluster configuration failed
637   */
638  public static Configuration getPeerClusterConfiguration(Configuration conf,
639    ReplicationPeerConfig peerConfig) throws IOException {
640    Configuration otherConf;
641    if (ConnectionRegistryFactory.tryParseAsConnectionURI(peerConfig.getClusterKey()) != null) {
642      otherConf = HBaseConfiguration.create(conf);
643    } else {
644      // only need to apply cluster key for old style cluster key
645      otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
646    }
647
648    if (!peerConfig.getConfiguration().isEmpty()) {
649      CompoundConfiguration compound = new CompoundConfiguration();
650      compound.add(otherConf);
651      compound.addStringMap(peerConfig.getConfiguration());
652      return compound;
653    }
654    return otherConf;
655  }
656}