View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Abortable;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.TableNotFoundException;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.classification.InterfaceStability;
46  import org.apache.hadoop.hbase.client.Admin;
47  import org.apache.hadoop.hbase.client.Connection;
48  import org.apache.hadoop.hbase.client.ConnectionFactory;
49  import org.apache.hadoop.hbase.client.RegionLocator;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationFactory;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer;
53  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
57  import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
60
61  /**
62   * <p>
63   * This class provides the administrative interface to HBase cluster
64   * replication. In order to use it, the cluster and the client using
65   * ReplicationAdmin must be configured with <code>hbase.replication</code>
66   * set to true.
67   * </p>
68   * <p>
69   * Adding a new peer results in creating new outbound connections from every
70   * region server to a subset of region servers on the slave cluster. Each
71   * new stream of replication will start replicating from the beginning of the
72   * current WAL, meaning that edits from that past will be replicated.
73   * </p>
74   * <p>
75   * Removing a peer is a destructive and irreversible operation that stops
76   * all the replication streams for the given cluster and deletes the metadata
77   * used to keep track of the replication state.
78   * </p>
79   * <p>
80   * To see which commands are available in the shell, type
81   * <code>replication</code>.
82   * </p>
83   */
84  @InterfaceAudience.Public
85  @InterfaceStability.Evolving
86  public class ReplicationAdmin implements Closeable {
87    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
88
89    public static final String TNAME = "tableName";
90    public static final String CFNAME = "columnFamilyName";
91
92    // only Global for now, can add other type
93    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
94    public static final String REPLICATIONTYPE = "replicationType";
95    public static final String REPLICATIONGLOBAL =
96        Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
97    public static final String REPLICATIONSERIAL =
98        Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
99
100   private final Connection connection;
101   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
102   // be moved to hbase-server. Resolve it in HBASE-11392.
103   private final ReplicationQueuesClient replicationQueuesClient;
104   private final ReplicationPeers replicationPeers;
105   /**
106    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
107    * on {@link #close()}.
108    */
109   private final ZooKeeperWatcher zkw;
110
111   /**
112    * Constructor that creates a connection to the local ZooKeeper ensemble.
113    * @param conf Configuration to use
114    * @throws IOException if an internal replication error occurs
115    * @throws RuntimeException if replication isn't enabled.
116    */
117   public ReplicationAdmin(Configuration conf) throws IOException {
118     this.connection = ConnectionFactory.createConnection(conf);
119     try {
120       zkw = createZooKeeperWatcher();
121       try {
122         this.replicationQueuesClient =
123             ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf,
124             this.connection, zkw));
125         this.replicationQueuesClient.init();
126         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
127           this.replicationQueuesClient, this.connection);
128         this.replicationPeers.init();
129       } catch (Exception exception) {
130         if (zkw != null) {
131           zkw.close();
132         }
133         throw exception;
134       }
135     } catch (Exception exception) {
136       if (connection != null) {
137         connection.close();
138       }
139       if (exception instanceof IOException) {
140         throw (IOException) exception;
141       } else if (exception instanceof RuntimeException) {
142         throw (RuntimeException) exception;
143       } else {
144         throw new IOException("Error initializing the replication admin client.", exception);
145       }
146     }
147   }
148
149   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
150     // This Abortable doesn't 'abort'... it just logs.
151     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
152       @Override
153       public void abort(String why, Throwable e) {
154         LOG.error(why, e);
155         // We used to call system.exit here but this script can be embedded by other programs that
156         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
157       }
158
159       @Override
160       public boolean isAborted() {
161         return false;
162       }
163     });
164   }
165
166   /**
167    * Add a new remote slave cluster for replication.
168    * @param id a short name that identifies the cluster
169    * @param peerConfig configuration for the replication slave cluster
170    * @param tableCfs the table and column-family list which will be replicated for this peer.
171    * A map from tableName to column family names. An empty collection can be passed
172    * to indicate replicating all column families. Pass null for replicating all table and column
173    * families
174    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
175    * use {@link #addPeer(String, ReplicationPeerConfig)} instead.
176    */
177   @Deprecated
178   public void addPeer(String id, ReplicationPeerConfig peerConfig,
179       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
180     if (tableCfs != null) {
181       peerConfig.setTableCFsMap(tableCfs);
182     }
183     this.replicationPeers.registerPeer(id, peerConfig);
184   }
185
186   /**
187    * Add a new remote slave cluster for replication.
188    * @param id a short name that identifies the cluster
189    * @param peerConfig configuration for the replication slave cluster
190    */
191   public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
192     checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
193       peerConfig.getTableCFsMap());
194     this.replicationPeers.registerPeer(id, peerConfig);
195   }
196
197   /**
198    *  @deprecated as release of 2.0.0, and it will be removed in 3.0.0
199    * */
200   @Deprecated
201   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
202     return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
203   }
204
205   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
206       throws ReplicationException {
207     checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
208       peerConfig.getTableCFsMap());
209     this.replicationPeers.updatePeerConfig(id, peerConfig);
210   }
211
212   /**
213    * Removes a peer cluster and stops the replication to it.
214    * @param id a short name that identifies the cluster
215    */
216   public void removePeer(String id) throws ReplicationException {
217     this.replicationPeers.unregisterPeer(id);
218   }
219
220   /**
221    * Restart the replication stream to the specified peer.
222    * @param id a short name that identifies the cluster
223    */
224   public void enablePeer(String id) throws ReplicationException {
225     this.replicationPeers.enablePeer(id);
226   }
227
228   /**
229    * Stop the replication stream to the specified peer.
230    * @param id a short name that identifies the cluster
231    */
232   public void disablePeer(String id) throws ReplicationException {
233     this.replicationPeers.disablePeer(id);
234   }
235
236   /**
237    * Get the number of slave clusters the local cluster has.
238    * @return number of slave clusters
239    */
240   public int getPeersCount() {
241     return this.replicationPeers.getAllPeerIds().size();
242   }
243
244   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
245     return this.replicationPeers.getAllPeerConfigs();
246   }
247
248   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
249     return this.replicationPeers.getReplicationPeerConfig(id);
250   }
251
252   /**
253    * Get the replicable table-cf config of the specified peer.
254    * @param id a short name that identifies the cluster
255    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
256    * use {@link #getPeerConfig(String)} instead.
257    * */
258   @Deprecated
259   public String getPeerTableCFs(String id) throws ReplicationException {
260     return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id));
261   }
262
263   /**
264    * Append the replicable table-cf config of the specified peer
265    * @param id a short that identifies the cluster
266    * @param tableCfs table-cfs config str
267    * @throws ReplicationException
268    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
269    * use {@link #appendPeerTableCFs(String, Map)} instead.
270    */
271   @Deprecated
272   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
273     appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
274   }
275
276   /**
277    * Append the replicable table-cf config of the specified peer
278    * @param id a short that identifies the cluster
279    * @param tableCfs A map from tableName to column family names
280    * @throws ReplicationException
281    */
282   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
283       throws ReplicationException {
284     if (tableCfs == null) {
285       throw new ReplicationException("tableCfs is null");
286     }
287     Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
288     if (preTableCfs == null) {
289       setPeerTableCFs(id, tableCfs);
290       return;
291     }
292     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
293       TableName table = entry.getKey();
294       Collection<String> appendCfs = entry.getValue();
295       if (preTableCfs.containsKey(table)) {
296         List<String> cfs = preTableCfs.get(table);
297         if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
298           preTableCfs.put(table, null);
299         } else {
300           Set<String> cfSet = new HashSet<String>(cfs);
301           cfSet.addAll(appendCfs);
302           preTableCfs.put(table, Lists.newArrayList(cfSet));
303         }
304       } else {
305         if (appendCfs == null || appendCfs.isEmpty()) {
306           preTableCfs.put(table, null);
307         } else {
308           preTableCfs.put(table, Lists.newArrayList(appendCfs));
309         }
310       }
311     }
312     setPeerTableCFs(id, preTableCfs);
313   }
314
315   /**
316    * Remove some table-cfs from table-cfs config of the specified peer
317    * @param id a short name that identifies the cluster
318    * @param tableCf table-cfs config str
319    * @throws ReplicationException
320    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
321    * use {@link #removePeerTableCFs(String, Map)} instead.
322    */
323   @Deprecated
324   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
325     removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
326   }
327
328   /**
329    * Remove some table-cfs from config of the specified peer
330    * @param id a short name that identifies the cluster
331    * @param tableCfs A map from tableName to column family names
332    * @throws ReplicationException
333    */
334   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
335       throws ReplicationException {
336     if (tableCfs == null) {
337       throw new ReplicationException("tableCfs is null");
338     }
339     Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
340     if (preTableCfs == null) {
341       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
342     }
343     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
344
345       TableName table = entry.getKey();
346       Collection<String> removeCfs = entry.getValue();
347       if (preTableCfs.containsKey(table)) {
348         List<String> cfs = preTableCfs.get(table);
349         if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
350           preTableCfs.remove(table);
351         } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
352           Set<String> cfSet = new HashSet<String>(cfs);
353           cfSet.removeAll(removeCfs);
354           if (cfSet.isEmpty()) {
355             preTableCfs.remove(table);
356           } else {
357             preTableCfs.put(table, Lists.newArrayList(cfSet));
358           }
359         } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
360           throw new ReplicationException("Cannot remove cf of table: " + table
361               + " which doesn't specify cfs from table-cfs config in peer: " + id);
362         } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
363           throw new ReplicationException("Cannot remove table: " + table
364               + " which has specified cfs from table-cfs config in peer: " + id);
365         }
366       } else {
367         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
368       }
369     }
370     setPeerTableCFs(id, preTableCfs);
371   }
372
373   /**
374    * Set the replicable table-cf config of the specified peer
375    * @param id a short name that identifies the cluster
376    * @param tableCfs the table and column-family list which will be replicated for this peer.
377    * A map from tableName to column family names. An empty collection can be passed
378    * to indicate replicating all column families. Pass null for replicating all table and column
379    * families
380    */
381   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
382       throws ReplicationException {
383     checkNamespacesAndTableCfsConfigConflict(
384       this.replicationPeers.getReplicationPeerConfig(id).getNamespaces(), tableCfs);
385     this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
386   }
387
388   /**
389    * Get the state of the specified peer cluster
390    * @param id String format of the Short name that identifies the peer,
391    * an IllegalArgumentException is thrown if it doesn't exist
392    * @return true if replication is enabled to that peer, false if it isn't
393    */
394   public boolean getPeerState(String id) throws ReplicationException {
395     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
396   }
397
398   @Override
399   public void close() throws IOException {
400     if (this.zkw != null) {
401       this.zkw.close();
402     }
403     if (this.connection != null) {
404       this.connection.close();
405     }
406   }
407
408
409   /**
410    * Find all column families that are replicated from this cluster
411    * @return the full list of the replicated column families of this cluster as:
412    *        tableName, family name, replicationType
413    *
414    * Currently replicationType is Global. In the future, more replication
415    * types may be extended here. For example
416    *  1) the replication may only apply to selected peers instead of all peers
417    *  2) the replicationType may indicate the host Cluster servers as Slave
418    *     for the table:columnFam.
419    */
420   public List<HashMap<String, String>> listReplicated() throws IOException {
421     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
422
423     Admin admin = connection.getAdmin();
424     HTableDescriptor[] tables;
425     try {
426       tables = admin.listTables();
427     } finally {
428       if (admin!= null) admin.close();
429     }
430
431     for (HTableDescriptor table : tables) {
432       HColumnDescriptor[] columns = table.getColumnFamilies();
433       String tableName = table.getNameAsString();
434       for (HColumnDescriptor column : columns) {
435         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
436           // At this moment, the columfam is replicated to all peers
437           HashMap<String, String> replicationEntry = new HashMap<String, String>();
438           replicationEntry.put(TNAME, tableName);
439           replicationEntry.put(CFNAME, column.getNameAsString());
440           replicationEntry.put(REPLICATIONTYPE,
441               column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
442                   REPLICATIONGLOBAL :
443                   REPLICATIONSERIAL);
444           replicationColFams.add(replicationEntry);
445         }
446       }
447     }
448
449     return replicationColFams;
450   }
451
452   /**
453    * Enable a table's replication switch.
454    * @param tableName name of the table
455    * @throws IOException if a remote or network exception occurs
456    */
457   public void enableTableRep(final TableName tableName) throws IOException {
458     if (tableName == null) {
459       throw new IllegalArgumentException("Table name cannot be null");
460     }
461     try (Admin admin = this.connection.getAdmin()) {
462       if (!admin.tableExists(tableName)) {
463         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
464             + "' does not exists.");
465       }
466     }
467     byte[][] splits = getTableSplitRowKeys(tableName);
468     checkAndSyncTableDescToPeers(tableName, splits);
469     setTableRep(tableName, true);
470   }
471
472   /**
473    * Disable a table's replication switch.
474    * @param tableName name of the table
475    * @throws IOException if a remote or network exception occurs
476    */
477   public void disableTableRep(final TableName tableName) throws IOException {
478     if (tableName == null) {
479       throw new IllegalArgumentException("Table name is null");
480     }
481     try (Admin admin = this.connection.getAdmin()) {
482       if (!admin.tableExists(tableName)) {
483         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
484             + "' does not exists.");
485       }
486     }
487     setTableRep(tableName, false);
488   }
489
490   /**
491    * Get the split row keys of table
492    * @param tableName table name
493    * @return array of split row keys
494    * @throws IOException
495    */
496   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
497     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
498       byte[][] startKeys = locator.getStartKeys();
499       if (startKeys.length == 1) {
500         return null;
501       }
502       byte[][] splits = new byte[startKeys.length - 1][];
503       for (int i = 1; i < startKeys.length; i++) {
504         splits[i - 1] = startKeys[i];
505       }
506       return splits;
507     }
508   }
509
510   /**
511    * Connect to peer and check the table descriptor on peer:
512    * <ol>
513    * <li>Create the same table on peer when not exist.</li>
514    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
515    * </ol>
516    * @param tableName name of the table to sync to the peer
517    * @param splits table split keys
518    * @throws IOException
519    */
520   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
521       throws IOException {
522     List<ReplicationPeer> repPeers = listReplicationPeers();
523     if (repPeers == null || repPeers.size() <= 0) {
524       throw new IllegalArgumentException("Found no peer cluster for replication.");
525     }
526
527     final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
528
529     for (ReplicationPeer repPeer : repPeers) {
530       Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
531       // TODO Currently peer TableCFs will not include namespace so we need to check only for table
532       // name without namespace in it. Need to correct this logic once we fix HBASE-11386.
533       if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
534         continue;
535       }
536
537       Configuration peerConf = repPeer.getConfiguration();
538       HTableDescriptor htd = null;
539       try (Connection conn = ConnectionFactory.createConnection(peerConf);
540           Admin admin = this.connection.getAdmin();
541           Admin repHBaseAdmin = conn.getAdmin()) {
542         htd = admin.getTableDescriptor(tableName);
543         HTableDescriptor peerHtd = null;
544         if (!repHBaseAdmin.tableExists(tableName)) {
545           repHBaseAdmin.createTable(htd, splits);
546         } else {
547           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
548           if (peerHtd == null) {
549             throw new IllegalArgumentException("Failed to get table descriptor for table "
550                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
551           } else if (!peerHtd.equals(htd)) {
552             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
553                 + " exists in peer cluster " + repPeer.getId()
554                 + ", but the table descriptors are not same when comapred with source cluster."
555                 + " Thus can not enable the table's replication switch.");
556           }
557         }
558       }
559     }
560   }
561
562   @VisibleForTesting
563   public void peerAdded(String id) throws ReplicationException {
564     this.replicationPeers.peerConnected(id);
565   }
566
567   @VisibleForTesting
568   List<ReplicationPeer> listReplicationPeers() {
569     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
570     if (peers == null || peers.size() <= 0) {
571       return null;
572     }
573     List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
574     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
575       String peerId = peerEntry.getKey();
576       try {
577         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
578         Configuration peerConf = pair.getSecond();
579         ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
580           peerId, pair.getFirst(), this.connection);
581         listOfPeers.add(peer);
582       } catch (ReplicationException e) {
583         LOG.warn("Failed to get valid replication peers. "
584             + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
585             + e.getMessage());
586         LOG.debug("Failure details to get valid replication peers.", e);
587         continue;
588       }
589     }
590     return listOfPeers;
591   }
592
593   /**
594    * Set the table's replication switch if the table's replication switch is already not set.
595    * @param tableName name of the table
596    * @param isRepEnabled is replication switch enable or disable
597    * @throws IOException if a remote or network exception occurs
598    */
599   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
600     Admin admin = null;
601     try {
602       admin = this.connection.getAdmin();
603       HTableDescriptor htd = admin.getTableDescriptor(tableName);
604       if (isTableRepEnabled(htd) ^ isRepEnabled) {
605         for (HColumnDescriptor hcd : htd.getFamilies()) {
606           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
607               : HConstants.REPLICATION_SCOPE_LOCAL);
608         }
609         admin.modifyTable(tableName, htd);
610       }
611     } finally {
612       if (admin != null) {
613         try {
614           admin.close();
615         } catch (IOException e) {
616           LOG.warn("Failed to close admin connection.");
617           LOG.debug("Details on failure to close admin connection.", e);
618         }
619       }
620     }
621   }
622
623   /**
624    * @param htd table descriptor details for the table to check
625    * @return true if table's replication switch is enabled
626    */
627   private boolean isTableRepEnabled(HTableDescriptor htd) {
628     for (HColumnDescriptor hcd : htd.getFamilies()) {
629       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
630           && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
631         return false;
632       }
633     }
634     return true;
635   }
636
637   /**
638    * Set a namespace in the peer config means that all tables in this namespace
639    * will be replicated to the peer cluster.
640    *
641    * 1. If you already have set a namespace in the peer config, then you can't set any table
642    *    of this namespace to the peer config.
643    * 2. If you already have set a table in the peer config, then you can't set this table's
644    *    namespace to the peer config.
645    *
646    * @param namespaces
647    * @param tableCfs
648    * @throws ReplicationException
649    */
650   private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
651       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
652     if (namespaces == null || namespaces.isEmpty()) {
653       return;
654     }
655     if (tableCfs == null || tableCfs.isEmpty()) {
656       return;
657     }
658     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
659       TableName table = entry.getKey();
660       if (namespaces.contains(table.getNamespaceAsString())) {
661         throw new ReplicationException(
662             "Table-cfs config conflict with namespaces config in peer");
663       }
664     }
665   }
666 }