View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  
35  import org.apache.commons.lang.StringUtils;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.Abortable;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.TableNotFoundException;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.client.Admin;
48  import org.apache.hadoop.hbase.client.Connection;
49  import org.apache.hadoop.hbase.client.ConnectionFactory;
50  import org.apache.hadoop.hbase.client.RegionLocator;
51  import org.apache.hadoop.hbase.replication.ReplicationException;
52  import org.apache.hadoop.hbase.replication.ReplicationFactory;
53  import org.apache.hadoop.hbase.replication.ReplicationPeer;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
55  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
56  import org.apache.hadoop.hbase.replication.ReplicationPeers;
57  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
60  
61  /**
62   * <p>
63   * This class provides the administrative interface to HBase cluster
64   * replication. In order to use it, the cluster and the client using
65   * ReplicationAdmin must be configured with <code>hbase.replication</code>
66   * set to true.
67   * </p>
68   * <p>
69   * Adding a new peer results in creating new outbound connections from every
70   * region server to a subset of region servers on the slave cluster. Each
71   * new stream of replication will start replicating from the beginning of the
72   * current WAL, meaning that edits from that past will be replicated.
73   * </p>
74   * <p>
75   * Removing a peer is a destructive and irreversible operation that stops
76   * all the replication streams for the given cluster and deletes the metadata
77   * used to keep track of the replication state.
78   * </p>
79   * <p>
80   * To see which commands are available in the shell, type
81   * <code>replication</code>.
82   * </p>
83   */
84  @InterfaceAudience.Public
85  @InterfaceStability.Evolving
86  public class ReplicationAdmin implements Closeable {
87    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
88  
89    public static final String TNAME = "tableName";
90    public static final String CFNAME = "columnFamlyName";
91  
92    // only Global for now, can add other type
93    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
94    public static final String REPLICATIONTYPE = "replicationType";
95    public static final String REPLICATIONGLOBAL = Integer
96        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
97  
98    private final Connection connection;
99    // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
100   // be moved to hbase-server. Resolve it in HBASE-11392.
101   private final ReplicationQueuesClient replicationQueuesClient;
102   private final ReplicationPeers replicationPeers;
103   /**
104    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
105    * on {@link #close()}.
106    */
107   private final ZooKeeperWatcher zkw;
108 
109   /**
110    * Constructor that creates a connection to the local ZooKeeper ensemble.
111    * @param conf Configuration to use
112    * @throws IOException if an internal replication error occurs
113    * @throws RuntimeException if replication isn't enabled.
114    */
115   public ReplicationAdmin(Configuration conf) throws IOException {
116     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
117         HConstants.REPLICATION_ENABLE_DEFAULT)) {
118       throw new RuntimeException("hbase.replication isn't true, please " +
119           "enable it in order to use replication");
120     }
121     this.connection = ConnectionFactory.createConnection(conf);
122     try {
123       zkw = createZooKeeperWatcher();
124       try {
125         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
126         this.replicationPeers.init();
127         this.replicationQueuesClient =
128             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
129         this.replicationQueuesClient.init();
130       } catch (Exception exception) {
131         if (zkw != null) {
132           zkw.close();
133         }
134         throw exception;
135       }
136     } catch (Exception exception) {
137       if (connection != null) {
138         connection.close();
139       }
140       if (exception instanceof IOException) {
141         throw (IOException) exception;
142       } else if (exception instanceof RuntimeException) {
143         throw (RuntimeException) exception;
144       } else {
145         throw new IOException("Error initializing the replication admin client.", exception);
146       }
147     }
148   }
149 
150   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
151     // This Abortable doesn't 'abort'... it just logs.
152     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
153       @Override
154       public void abort(String why, Throwable e) {
155         LOG.error(why, e);
156         // We used to call system.exit here but this script can be embedded by other programs that
157         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
158       }
159 
160       @Override
161       public boolean isAborted() {
162         return false;
163       }
164     });
165   }
166 
167   /**
168    * Add a new peer cluster to replicate to.
169    * @param id a short name that identifies the cluster
170    * @param clusterKey the concatenation of the slave cluster's
171    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
172    * @throws IllegalStateException if there's already one slave since
173    * multi-slave isn't supported yet.
174    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
175    */
176   @Deprecated
177   public void addPeer(String id, String clusterKey) throws ReplicationException {
178     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
179   }
180 
181   @Deprecated
182   public void addPeer(String id, String clusterKey, String tableCFs)
183     throws ReplicationException {
184     this.replicationPeers.addPeer(id,
185       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
186   }
187 
188   /**
189    * Add a new remote slave cluster for replication.
190    * @param id a short name that identifies the cluster
191    * @param peerConfig configuration for the replication slave cluster
192    * @param tableCfs the table and column-family list which will be replicated for this peer.
193    * A map from tableName to column family names. An empty collection can be passed
194    * to indicate replicating all column families. Pass null for replicating all table and column
195    * families
196    */
197   public void addPeer(String id, ReplicationPeerConfig peerConfig,
198       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
199     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
200   }
201 
202   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
203     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
204       return null;
205     }
206 
207     Map<TableName, List<String>> tableCFsMap = null;
208     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
209     // parse out (table, cf-list) pairs from tableCFsConfig
210     // format: "table1:cf1,cf2;table2:cfA,cfB"
211     String[] tables = tableCFsConfig.split(";");
212     for (String tab : tables) {
213       // 1 ignore empty table config
214       tab = tab.trim();
215       if (tab.length() == 0) {
216         continue;
217       }
218       // 2 split to "table" and "cf1,cf2"
219       //   for each table: "table:cf1,cf2" or "table"
220       String[] pair = tab.split(":");
221       String tabName = pair[0].trim();
222       if (pair.length > 2 || tabName.length() == 0) {
223         LOG.error("ignore invalid tableCFs setting: " + tab);
224         continue;
225       }
226 
227       // 3 parse "cf1,cf2" part to List<cf>
228       List<String> cfs = null;
229       if (pair.length == 2) {
230         String[] cfsList = pair[1].split(",");
231         for (String cf : cfsList) {
232           String cfName = cf.trim();
233           if (cfName.length() > 0) {
234             if (cfs == null) {
235               cfs = new ArrayList<String>();
236             }
237             cfs.add(cfName);
238           }
239         }
240       }
241 
242       // 4 put <table, List<cf>> to map
243       if (tableCFsMap == null) {
244         tableCFsMap = new HashMap<TableName, List<String>>();
245       }
246       tableCFsMap.put(TableName.valueOf(tabName), cfs);
247     }
248     return tableCFsMap;
249   }
250 
251   @VisibleForTesting
252   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
253     String tableCfsStr = null;
254     if (tableCfs != null) {
255       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
256       StringBuilder builder = new StringBuilder();
257       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
258         if (builder.length() > 0) {
259           builder.append(";");
260         }
261         builder.append(entry.getKey());
262         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
263           builder.append(":");
264           builder.append(StringUtils.join(entry.getValue(), ","));
265         }
266       }
267       tableCfsStr = builder.toString();
268     }
269     return tableCfsStr;
270   }
271 
272   /**
273    * Removes a peer cluster and stops the replication to it.
274    * @param id a short name that identifies the cluster
275    */
276   public void removePeer(String id) throws ReplicationException {
277     this.replicationPeers.removePeer(id);
278   }
279 
280   /**
281    * Restart the replication stream to the specified peer.
282    * @param id a short name that identifies the cluster
283    */
284   public void enablePeer(String id) throws ReplicationException {
285     this.replicationPeers.enablePeer(id);
286   }
287 
288   /**
289    * Stop the replication stream to the specified peer.
290    * @param id a short name that identifies the cluster
291    */
292   public void disablePeer(String id) throws ReplicationException {
293     this.replicationPeers.disablePeer(id);
294   }
295 
296   /**
297    * Get the number of slave clusters the local cluster has.
298    * @return number of slave clusters
299    */
300   public int getPeersCount() {
301     return this.replicationPeers.getAllPeerIds().size();
302   }
303 
304   /**
305    * Map of this cluster's peers for display.
306    * @return A map of peer ids to peer cluster keys
307    * @deprecated use {@link #listPeerConfigs()}
308    */
309   @Deprecated
310   public Map<String, String> listPeers() {
311     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
312     Map<String, String> ret = new HashMap<String, String>(peers.size());
313 
314     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
315       ret.put(entry.getKey(), entry.getValue().getClusterKey());
316     }
317     return ret;
318   }
319 
320   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
321     return this.replicationPeers.getAllPeerConfigs();
322   }
323 
324   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
325     return this.replicationPeers.getReplicationPeerConfig(id);
326   }
327 
328   /**
329    * Get the replicable table-cf config of the specified peer.
330    * @param id a short name that identifies the cluster
331    */
332   public String getPeerTableCFs(String id) throws ReplicationException {
333     return this.replicationPeers.getPeerTableCFsConfig(id);
334   }
335 
336   /**
337    * Set the replicable table-cf config of the specified peer
338    * @param id a short name that identifies the cluster
339    * @deprecated use {@link #setPeerTableCFs(String, Map)}
340    */
341   @Deprecated
342   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
343     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
344   }
345 
346   /**
347    * Append the replicable table-cf config of the specified peer
348    * @param id a short that identifies the cluster
349    * @param tableCfs table-cfs config str
350    */
351   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
352     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
353   }
354 
355   /**
356    * Append the replicable table-cf config of the specified peer
357    * @param id a short that identifies the cluster
358    * @param tableCfs A map from tableName to column family names
359    */
360   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
361       throws ReplicationException {
362     if (tableCfs == null) {
363       throw new ReplicationException("tableCfs is null");
364     }
365     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
366     if (preTableCfs == null) {
367       setPeerTableCFs(id, tableCfs);
368       return;
369     }
370 
371     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
372       TableName table = entry.getKey();
373       Collection<String> appendCfs = entry.getValue();
374       if (preTableCfs.containsKey(table)) {
375         List<String> cfs = preTableCfs.get(table);
376         if (cfs == null || appendCfs == null) {
377           preTableCfs.put(table, null);
378         } else {
379           Set<String> cfSet = new HashSet<String>(cfs);
380           cfSet.addAll(appendCfs);
381           preTableCfs.put(table, Lists.newArrayList(cfSet));
382         }
383       } else {
384         if (appendCfs == null || appendCfs.isEmpty()) {
385           preTableCfs.put(table, null);
386         } else {
387           preTableCfs.put(table, Lists.newArrayList(appendCfs));
388         }
389       }
390     }
391     setPeerTableCFs(id, preTableCfs);
392   }
393 
394   /**
395    * Remove some table-cfs from table-cfs config of the specified peer
396    * @param id a short name that identifies the cluster
397    * @param tableCf table-cfs config str
398    * @throws ReplicationException
399    */
400   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
401     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
402   }
403 
404   /**
405    * Remove some table-cfs from config of the specified peer
406    * @param id a short name that identifies the cluster
407    * @param tableCfs A map from tableName to column family names
408    * @throws ReplicationException
409    */
410   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
411       throws ReplicationException {
412     if (tableCfs == null) {
413       throw new ReplicationException("tableCfs is null");
414     }
415 
416     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
417     if (preTableCfs == null) {
418       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
419     }
420     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
421       TableName table = entry.getKey();
422       Collection<String> removeCfs = entry.getValue();
423       if (preTableCfs.containsKey(table)) {
424         List<String> cfs = preTableCfs.get(table);
425         if (cfs == null && removeCfs == null) {
426           preTableCfs.remove(table);
427         } else if (cfs != null && removeCfs != null) {
428           Set<String> cfSet = new HashSet<String>(cfs);
429           cfSet.removeAll(removeCfs);
430           if (cfSet.isEmpty()) {
431             preTableCfs.remove(table);
432           } else {
433             preTableCfs.put(table, Lists.newArrayList(cfSet));
434           }
435         } else if (cfs == null && removeCfs != null) {
436           throw new ReplicationException("Cannot remove cf of table: " + table
437               + " which doesn't specify cfs from table-cfs config in peer: " + id);
438         } else if (cfs != null && removeCfs == null) {
439           throw new ReplicationException("Cannot remove table: " + table
440               + " which has specified cfs from table-cfs config in peer: " + id);
441         }
442       } else {
443         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
444       }
445     }
446     setPeerTableCFs(id, preTableCfs);
447   }
448 
449   /**
450    * Set the replicable table-cf config of the specified peer
451    * @param id a short name that identifies the cluster
452    * @param tableCfs the table and column-family list which will be replicated for this peer.
453    * A map from tableName to column family names. An empty collection can be passed
454    * to indicate replicating all column families. Pass null for replicating all table and column
455    * families
456    */
457   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
458       throws ReplicationException {
459     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
460   }
461 
462   /**
463    * Get the state of the specified peer cluster
464    * @param id String format of the Short name that identifies the peer,
465    * an IllegalArgumentException is thrown if it doesn't exist
466    * @return true if replication is enabled to that peer, false if it isn't
467    */
468   public boolean getPeerState(String id) throws ReplicationException {
469     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
470   }
471 
472   @Override
473   public void close() throws IOException {
474     if (this.zkw != null) {
475       this.zkw.close();
476     }
477     if (this.connection != null) {
478       this.connection.close();
479     }
480   }
481 
482 
483   /**
484    * Find all column families that are replicated from this cluster
485    * @return the full list of the replicated column families of this cluster as:
486    *        tableName, family name, replicationType
487    *
488    * Currently replicationType is Global. In the future, more replication
489    * types may be extended here. For example
490    *  1) the replication may only apply to selected peers instead of all peers
491    *  2) the replicationType may indicate the host Cluster servers as Slave
492    *     for the table:columnFam.
493    */
494   public List<HashMap<String, String>> listReplicated() throws IOException {
495     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
496 
497     Admin admin = connection.getAdmin();
498     HTableDescriptor[] tables;
499     try {
500       tables = admin.listTables();
501     } finally {
502       if (admin!= null) admin.close();
503     }
504 
505     for (HTableDescriptor table : tables) {
506       HColumnDescriptor[] columns = table.getColumnFamilies();
507       String tableName = table.getNameAsString();
508       for (HColumnDescriptor column : columns) {
509         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
510           // At this moment, the columfam is replicated to all peers
511           HashMap<String, String> replicationEntry = new HashMap<String, String>();
512           replicationEntry.put(TNAME, tableName);
513           replicationEntry.put(CFNAME, column.getNameAsString());
514           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
515           replicationColFams.add(replicationEntry);
516         }
517       }
518     }
519 
520     return replicationColFams;
521   }
522 
523   /**
524    * Enable a table's replication switch.
525    * @param tableName name of the table
526    * @throws IOException if a remote or network exception occurs
527    */
528   public void enableTableRep(final TableName tableName) throws IOException {
529     if (tableName == null) {
530       throw new IllegalArgumentException("Table name cannot be null");
531     }
532     try (Admin admin = this.connection.getAdmin()) {
533       if (!admin.tableExists(tableName)) {
534         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
535             + "' does not exists.");
536       }
537     }
538     byte[][] splits = getTableSplitRowKeys(tableName);
539     checkAndSyncTableDescToPeers(tableName, splits);
540     setTableRep(tableName, true);
541   }
542 
543   /**
544    * Disable a table's replication switch.
545    * @param tableName name of the table
546    * @throws IOException if a remote or network exception occurs
547    */
548   public void disableTableRep(final TableName tableName) throws IOException {
549     if (tableName == null) {
550       throw new IllegalArgumentException("Table name is null");
551     }
552     try (Admin admin = this.connection.getAdmin()) {
553       if (!admin.tableExists(tableName)) {
554         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
555             + "' does not exists.");
556       }
557     }
558     setTableRep(tableName, false);
559   }
560 
561   /**
562    * Get the split row keys of table
563    * @param tableName table name
564    * @return array of split row keys
565    * @throws IOException
566    */
567   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
568     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
569       byte[][] startKeys = locator.getStartKeys();
570       if (startKeys.length == 1) {
571         return null;
572       }
573       byte[][] splits = new byte[startKeys.length - 1][];
574       for (int i = 1; i < startKeys.length; i++) {
575         splits[i - 1] = startKeys[i];
576       }
577       return splits;
578     }
579   }
580 
581   /**
582    * Connect to peer and check the table descriptor on peer:
583    * <ol>
584    * <li>Create the same table on peer when not exist.</li>
585    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
586    * </ol>
587    * @param tableName name of the table to sync to the peer
588    * @param splits table split keys
589    * @throws IOException
590    */
591   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
592       throws IOException {
593     List<ReplicationPeer> repPeers = listReplicationPeers();
594     if (repPeers == null || repPeers.size() <= 0) {
595       throw new IllegalArgumentException("Found no peer cluster for replication.");
596     }
597     
598     final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
599     
600     for (ReplicationPeer repPeer : repPeers) {
601       Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
602       // TODO Currently peer TableCFs will not include namespace so we need to check only for table
603       // name without namespace in it. Need to correct this logic once we fix HBASE-11386.
604       if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
605         continue;
606       }
607 
608       Configuration peerConf = repPeer.getConfiguration();
609       HTableDescriptor htd = null;
610       try (Connection conn = ConnectionFactory.createConnection(peerConf);
611           Admin admin = this.connection.getAdmin();
612           Admin repHBaseAdmin = conn.getAdmin()) {
613         htd = admin.getTableDescriptor(tableName);
614         HTableDescriptor peerHtd = null;
615         if (!repHBaseAdmin.tableExists(tableName)) {
616           repHBaseAdmin.createTable(htd, splits);
617         } else {
618           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
619           if (peerHtd == null) {
620             throw new IllegalArgumentException("Failed to get table descriptor for table "
621                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
622           } else if (!peerHtd.equals(htd)) {
623             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
624                 + " exists in peer cluster " + repPeer.getId()
625                 + ", but the table descriptors are not same when comapred with source cluster."
626                 + " Thus can not enable the table's replication switch.");
627           }
628         }
629       }
630     }
631   }
632 
633   @VisibleForTesting
634   List<ReplicationPeer> listReplicationPeers() {
635     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
636     if (peers == null || peers.size() <= 0) {
637       return null;
638     }
639     List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
640     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
641       String peerId = peerEntry.getKey();
642       try {
643         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
644         Configuration peerConf = pair.getSecond();
645         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
646             parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
647         listOfPeers.add(peer);
648       } catch (ReplicationException e) {
649         LOG.warn("Failed to get valid replication peers. "
650             + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
651             + e.getMessage());
652         LOG.debug("Failure details to get valid replication peers.", e);
653         continue;
654       }
655     }
656     return listOfPeers;
657   }
658 
659   /**
660    * Set the table's replication switch if the table's replication switch is already not set.
661    * @param tableName name of the table
662    * @param isRepEnabled is replication switch enable or disable
663    * @throws IOException if a remote or network exception occurs
664    */
665   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
666     Admin admin = null;
667     try {
668       admin = this.connection.getAdmin();
669       HTableDescriptor htd = admin.getTableDescriptor(tableName);
670       if (isTableRepEnabled(htd) ^ isRepEnabled) {
671         boolean isOnlineSchemaUpdateEnabled =
672             this.connection.getConfiguration()
673                 .getBoolean("hbase.online.schema.update.enable", true);
674         if (!isOnlineSchemaUpdateEnabled) {
675           admin.disableTable(tableName);
676         }
677         for (HColumnDescriptor hcd : htd.getFamilies()) {
678           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
679               : HConstants.REPLICATION_SCOPE_LOCAL);
680         }
681         admin.modifyTable(tableName, htd);
682         if (!isOnlineSchemaUpdateEnabled) {
683           admin.enableTable(tableName);
684         }
685       }
686     } finally {
687       if (admin != null) {
688         try {
689           admin.close();
690         } catch (IOException e) {
691           LOG.warn("Failed to close admin connection.");
692           LOG.debug("Details on failure to close admin connection.", e);
693         }
694       }
695     }
696   }
697 
698   /**
699    * @param htd table descriptor details for the table to check
700    * @return true if table's replication switch is enabled
701    */
702   private boolean isTableRepEnabled(HTableDescriptor htd) {
703     for (HColumnDescriptor hcd : htd.getFamilies()) {
704       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
705         return false;
706       }
707     }
708     return true;
709   }
710 }