View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.classification.InterfaceStability;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Abortable;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.TableNotFoundException;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.classification.InterfaceStability;
46  import org.apache.hadoop.hbase.client.Admin;
47  import org.apache.hadoop.hbase.client.HBaseAdmin;
48  import org.apache.hadoop.hbase.client.Connection;
49  import org.apache.hadoop.hbase.client.ConnectionFactory;
50  import org.apache.hadoop.hbase.client.RegionLocator;
51  import org.apache.hadoop.hbase.replication.ReplicationException;
52  import org.apache.hadoop.hbase.replication.ReplicationFactory;
53  import org.apache.hadoop.hbase.replication.ReplicationPeer;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
55  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
56  import org.apache.hadoop.hbase.replication.ReplicationPeers;
57  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
60  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
61  import org.apache.zookeeper.KeeperException;
62  import org.apache.zookeeper.data.Stat;
63  
64  import com.google.common.annotations.VisibleForTesting;
65  import com.google.common.collect.Lists;
66  
67  /**
68   * <p>
69   * This class provides the administrative interface to HBase cluster
70   * replication. In order to use it, the cluster and the client using
71   * ReplicationAdmin must be configured with <code>hbase.replication</code>
72   * set to true.
73   * </p>
74   * <p>
75   * Adding a new peer results in creating new outbound connections from every
76   * region server to a subset of region servers on the slave cluster. Each
77   * new stream of replication will start replicating from the beginning of the
78   * current WAL, meaning that edits from that past will be replicated.
79   * </p>
80   * <p>
81   * Removing a peer is a destructive and irreversible operation that stops
82   * all the replication streams for the given cluster and deletes the metadata
83   * used to keep track of the replication state.
84   * </p>
85   * <p>
86   * To see which commands are available in the shell, type
87   * <code>replication</code>.
88   * </p>
89   */
90  @InterfaceAudience.Public
91  @InterfaceStability.Evolving
92  public class ReplicationAdmin implements Closeable {
93    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
94  
95    public static final String TNAME = "tableName";
96    public static final String CFNAME = "columnFamlyName";
97  
98    // only Global for now, can add other type
99    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
100   public static final String REPLICATIONTYPE = "replicationType";
101   public static final String REPLICATIONGLOBAL = Integer
102       .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
103 
104   private final Connection connection;
105   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
106   // be moved to hbase-server. Resolve it in HBASE-11392.
107   private final ReplicationQueuesClient replicationQueuesClient;
108   private final ReplicationPeers replicationPeers;
109   /**
110    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
111    * on {@link #close()}.
112    */
113   private final ZooKeeperWatcher zkw;
114 
115   /**
116    * Constructor that creates a connection to the local ZooKeeper ensemble.
117    * @param conf Configuration to use
118    * @throws IOException if an internal replication error occurs
119    * @throws RuntimeException if replication isn't enabled.
120    */
121   public ReplicationAdmin(Configuration conf) throws IOException {
122     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
123         HConstants.REPLICATION_ENABLE_DEFAULT)) {
124       throw new RuntimeException("hbase.replication isn't true, please " +
125           "enable it in order to use replication");
126     }
127     this.connection = ConnectionFactory.createConnection(conf);
128     try {
129       zkw = createZooKeeperWatcher();
130       try {
131         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
132         this.replicationPeers.init();
133         this.replicationQueuesClient =
134             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
135         this.replicationQueuesClient.init();
136       } catch (Exception exception) {
137         if (zkw != null) {
138           zkw.close();
139         }
140         throw exception;
141       }
142     } catch (Exception exception) {
143       if (connection != null) {
144         connection.close();
145       }
146       if (exception instanceof IOException) {
147         throw (IOException) exception;
148       } else if (exception instanceof RuntimeException) {
149         throw (RuntimeException) exception;
150       } else {
151         throw new IOException("Error initializing the replication admin client.", exception);
152       }
153     }
154   }
155 
156   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
157     // This Abortable doesn't 'abort'... it just logs.
158     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
159       @Override
160       public void abort(String why, Throwable e) {
161         LOG.error(why, e);
162         // We used to call system.exit here but this script can be embedded by other programs that
163         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
164       }
165 
166       @Override
167       public boolean isAborted() {
168         return false;
169       }
170     });
171   }
172 
173   /**
174    * Add a new peer cluster to replicate to.
175    * @param id a short name that identifies the cluster
176    * @param clusterKey the concatenation of the slave cluster's
177    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
178    * @throws IllegalStateException if there's already one slave since
179    * multi-slave isn't supported yet.
180    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
181    */
182   @Deprecated
183   public void addPeer(String id, String clusterKey) throws ReplicationException {
184     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
185   }
186 
187   @Deprecated
188   public void addPeer(String id, String clusterKey, String tableCFs)
189     throws ReplicationException {
190     this.replicationPeers.addPeer(id,
191       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
192   }
193 
194   /**
195    * Add a new remote slave cluster for replication.
196    * @param id a short name that identifies the cluster
197    * @param peerConfig configuration for the replication slave cluster
198    * @param tableCfs the table and column-family list which will be replicated for this peer.
199    * A map from tableName to column family names. An empty collection can be passed
200    * to indicate replicating all column families. Pass null for replicating all table and column
201    * families
202    */
203   public void addPeer(String id, ReplicationPeerConfig peerConfig,
204       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
205     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
206   }
207 
208   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
209     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
210       return null;
211     }
212 
213     Map<TableName, List<String>> tableCFsMap = null;
214     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
215     // parse out (table, cf-list) pairs from tableCFsConfig
216     // format: "table1:cf1,cf2;table2:cfA,cfB"
217     String[] tables = tableCFsConfig.split(";");
218     for (String tab : tables) {
219       // 1 ignore empty table config
220       tab = tab.trim();
221       if (tab.length() == 0) {
222         continue;
223       }
224       // 2 split to "table" and "cf1,cf2"
225       //   for each table: "table:cf1,cf2" or "table"
226       String[] pair = tab.split(":");
227       String tabName = pair[0].trim();
228       if (pair.length > 2 || tabName.length() == 0) {
229         LOG.error("ignore invalid tableCFs setting: " + tab);
230         continue;
231       }
232 
233       // 3 parse "cf1,cf2" part to List<cf>
234       List<String> cfs = null;
235       if (pair.length == 2) {
236         String[] cfsList = pair[1].split(",");
237         for (String cf : cfsList) {
238           String cfName = cf.trim();
239           if (cfName.length() > 0) {
240             if (cfs == null) {
241               cfs = new ArrayList<String>();
242             }
243             cfs.add(cfName);
244           }
245         }
246       }
247 
248       // 4 put <table, List<cf>> to map
249       if (tableCFsMap == null) {
250         tableCFsMap = new HashMap<TableName, List<String>>();
251       }
252       tableCFsMap.put(TableName.valueOf(tabName), cfs);
253     }
254     return tableCFsMap;
255   }
256 
257   @VisibleForTesting
258   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
259     String tableCfsStr = null;
260     if (tableCfs != null) {
261       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
262       StringBuilder builder = new StringBuilder();
263       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
264         if (builder.length() > 0) {
265           builder.append(";");
266         }
267         builder.append(entry.getKey());
268         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
269           builder.append(":");
270           builder.append(StringUtils.join(entry.getValue(), ","));
271         }
272       }
273       tableCfsStr = builder.toString();
274     }
275     return tableCfsStr;
276   }
277 
278   /**
279    * Removes a peer cluster and stops the replication to it.
280    * @param id a short name that identifies the cluster
281    */
282   public void removePeer(String id) throws ReplicationException {
283     this.replicationPeers.removePeer(id);
284   }
285 
286   /**
287    * Restart the replication stream to the specified peer.
288    * @param id a short name that identifies the cluster
289    */
290   public void enablePeer(String id) throws ReplicationException {
291     this.replicationPeers.enablePeer(id);
292   }
293 
294   /**
295    * Stop the replication stream to the specified peer.
296    * @param id a short name that identifies the cluster
297    */
298   public void disablePeer(String id) throws ReplicationException {
299     this.replicationPeers.disablePeer(id);
300   }
301 
302   /**
303    * Get the number of slave clusters the local cluster has.
304    * @return number of slave clusters
305    */
306   public int getPeersCount() {
307     return this.replicationPeers.getAllPeerIds().size();
308   }
309 
310   /**
311    * Map of this cluster's peers for display.
312    * @return A map of peer ids to peer cluster keys
313    * @deprecated use {@link #listPeerConfigs()}
314    */
315   @Deprecated
316   public Map<String, String> listPeers() {
317     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
318     Map<String, String> ret = new HashMap<String, String>(peers.size());
319 
320     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
321       ret.put(entry.getKey(), entry.getValue().getClusterKey());
322     }
323     return ret;
324   }
325 
326   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
327     return this.replicationPeers.getAllPeerConfigs();
328   }
329 
330   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
331     return this.replicationPeers.getReplicationPeerConfig(id);
332   }
333 
334   /**
335    * Get the replicable table-cf config of the specified peer.
336    * @param id a short name that identifies the cluster
337    */
338   public String getPeerTableCFs(String id) throws ReplicationException {
339     return this.replicationPeers.getPeerTableCFsConfig(id);
340   }
341 
342   /**
343    * Set the replicable table-cf config of the specified peer
344    * @param id a short name that identifies the cluster
345    * @deprecated use {@link #setPeerTableCFs(String, Map)}
346    */
347   @Deprecated
348   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
349     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
350   }
351 
352   /**
353    * Append the replicable table-cf config of the specified peer
354    * @param id a short that identifies the cluster
355    * @param tableCfs table-cfs config str
356    * @throws KeeperException
357    */
358   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
359     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
360   }
361 
362   /**
363    * Append the replicable table-cf config of the specified peer
364    * @param id a short that identifies the cluster
365    * @param tableCfs A map from tableName to column family names
366    * @throws KeeperException
367    */
368   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
369       throws ReplicationException {
370     if (tableCfs == null) {
371       throw new ReplicationException("tableCfs is null");
372     }
373     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
374     if (preTableCfs == null) {
375       setPeerTableCFs(id, tableCfs);
376       return;
377     }
378 
379     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
380       TableName table = entry.getKey();
381       Collection<String> appendCfs = entry.getValue();
382       if (preTableCfs.containsKey(table)) {
383         List<String> cfs = preTableCfs.get(table);
384         if (cfs == null || appendCfs == null) {
385           preTableCfs.put(table, null);
386         } else {
387           Set<String> cfSet = new HashSet<String>(cfs);
388           cfSet.addAll(appendCfs);
389           preTableCfs.put(table, Lists.newArrayList(cfSet));
390         }
391       } else {
392         if (appendCfs == null || appendCfs.isEmpty()) {
393           preTableCfs.put(table, null);
394         } else {
395           preTableCfs.put(table, Lists.newArrayList(appendCfs));
396         }
397       }
398     }
399     setPeerTableCFs(id, preTableCfs);
400   }
401 
402   /**
403    * Remove some table-cfs from table-cfs config of the specified peer
404    * @param id a short name that identifies the cluster
405    * @param tableCf table-cfs config str
406    * @throws ReplicationException
407    */
408   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
409     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
410   }
411 
412   /**
413    * Remove some table-cfs from config of the specified peer
414    * @param id a short name that identifies the cluster
415    * @param tableCfs A map from tableName to column family names
416    * @throws ReplicationException
417    */
418   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
419       throws ReplicationException {
420     if (tableCfs == null) {
421       throw new ReplicationException("tableCfs is null");
422     }
423 
424     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
425     if (preTableCfs == null) {
426       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
427     }
428     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
429       TableName table = entry.getKey();
430       Collection<String> removeCfs = entry.getValue();
431       if (preTableCfs.containsKey(table)) {
432         List<String> cfs = preTableCfs.get(table);
433         if (cfs == null && removeCfs == null) {
434           preTableCfs.remove(table);
435         } else if (cfs != null && removeCfs != null) {
436           Set<String> cfSet = new HashSet<String>(cfs);
437           cfSet.removeAll(removeCfs);
438           if (cfSet.isEmpty()) {
439             preTableCfs.remove(table);
440           } else {
441             preTableCfs.put(table, Lists.newArrayList(cfSet));
442           }
443         } else if (cfs == null && removeCfs != null) {
444           throw new ReplicationException("Cannot remove cf of table: " + table
445               + " which doesn't specify cfs from table-cfs config in peer: " + id);
446         } else if (cfs != null && removeCfs == null) {
447           throw new ReplicationException("Cannot remove table: " + table
448               + " which has specified cfs from table-cfs config in peer: " + id);
449         }
450       } else {
451         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
452       }
453     }
454     setPeerTableCFs(id, preTableCfs);
455   }
456 
457   /**
458    * Set the replicable table-cf config of the specified peer
459    * @param id a short name that identifies the cluster
460    * @param tableCfs the table and column-family list which will be replicated for this peer.
461    * A map from tableName to column family names. An empty collection can be passed
462    * to indicate replicating all column families. Pass null for replicating all table and column
463    * families
464    */
465   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
466       throws ReplicationException {
467     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
468   }
469 
470   /**
471    * Get the state of the specified peer cluster
472    * @param id String format of the Short name that identifies the peer,
473    * an IllegalArgumentException is thrown if it doesn't exist
474    * @return true if replication is enabled to that peer, false if it isn't
475    */
476   public boolean getPeerState(String id) throws ReplicationException {
477     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
478   }
479 
480   @Override
481   public void close() throws IOException {
482     if (this.zkw != null) {
483       this.zkw.close();
484     }
485     if (this.connection != null) {
486       this.connection.close();
487     }
488   }
489 
490 
491   /**
492    * Find all column families that are replicated from this cluster
493    * @return the full list of the replicated column families of this cluster as:
494    *        tableName, family name, replicationType
495    *
496    * Currently replicationType is Global. In the future, more replication
497    * types may be extended here. For example
498    *  1) the replication may only apply to selected peers instead of all peers
499    *  2) the replicationType may indicate the host Cluster servers as Slave
500    *     for the table:columnFam.
501    */
502   public List<HashMap<String, String>> listReplicated() throws IOException {
503     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
504 
505     Admin admin = connection.getAdmin();
506     HTableDescriptor[] tables;
507     try {
508       tables = admin.listTables();
509     } finally {
510       if (admin!= null) admin.close();
511     }
512 
513     for (HTableDescriptor table : tables) {
514       HColumnDescriptor[] columns = table.getColumnFamilies();
515       String tableName = table.getNameAsString();
516       for (HColumnDescriptor column : columns) {
517         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
518           // At this moment, the columfam is replicated to all peers
519           HashMap<String, String> replicationEntry = new HashMap<String, String>();
520           replicationEntry.put(TNAME, tableName);
521           replicationEntry.put(CFNAME, column.getNameAsString());
522           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
523           replicationColFams.add(replicationEntry);
524         }
525       }
526     }
527 
528     return replicationColFams;
529   }
530 
531   /**
532    * Enable a table's replication switch.
533    * @param tableName name of the table
534    * @throws IOException if a remote or network exception occurs
535    */
536   public void enableTableRep(final TableName tableName) throws IOException {
537     if (tableName == null) {
538       throw new IllegalArgumentException("Table name cannot be null");
539     }
540     try (Admin admin = this.connection.getAdmin()) {
541       if (!admin.tableExists(tableName)) {
542         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
543             + "' does not exists.");
544       }
545     }
546     byte[][] splits = getTableSplitRowKeys(tableName);
547     checkAndSyncTableDescToPeers(tableName, splits);
548     setTableRep(tableName, true);
549   }
550 
551   /**
552    * Disable a table's replication switch.
553    * @param tableName name of the table
554    * @throws IOException if a remote or network exception occurs
555    */
556   public void disableTableRep(final TableName tableName) throws IOException {
557     if (tableName == null) {
558       throw new IllegalArgumentException("Table name is null");
559     }
560     try (Admin admin = this.connection.getAdmin()) {
561       if (!admin.tableExists(tableName)) {
562         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
563             + "' does not exists.");
564       }
565     }
566     setTableRep(tableName, false);
567   }
568 
569   /**
570    * Get the split row keys of table
571    * @param tableName table name
572    * @return array of split row keys
573    * @throws IOException
574    */
575   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
576     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
577       byte[][] startKeys = locator.getStartKeys();
578       if (startKeys.length == 1) {
579         return null;
580       }
581       byte[][] splits = new byte[startKeys.length - 1][];
582       for (int i = 1; i < startKeys.length; i++) {
583         splits[i - 1] = startKeys[i];
584       }
585       return splits;
586     }
587   }
588 
589   /**
590    * Connect to peer and check the table descriptor on peer:
591    * <ol>
592    * <li>Create the same table on peer when not exist.</li>
593    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
594    * </ol>
595    * @param tableName name of the table to sync to the peer
596    * @param splits table split keys
597    * @throws IOException
598    */
599   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
600       throws IOException {
601     List<ReplicationPeer> repPeers = listValidReplicationPeers();
602     if (repPeers == null || repPeers.size() <= 0) {
603       throw new IllegalArgumentException("Found no peer cluster for replication.");
604     }
605     for (ReplicationPeer repPeer : repPeers) {
606       Configuration peerConf = repPeer.getConfiguration();
607       HTableDescriptor htd = null;
608       try (Connection conn = ConnectionFactory.createConnection(peerConf);
609           Admin admin = this.connection.getAdmin();
610           Admin repHBaseAdmin = conn.getAdmin()) {
611         htd = admin.getTableDescriptor(tableName);
612         HTableDescriptor peerHtd = null;
613         if (!repHBaseAdmin.tableExists(tableName)) {
614           repHBaseAdmin.createTable(htd, splits);
615         } else {
616           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
617           if (peerHtd == null) {
618             throw new IllegalArgumentException("Failed to get table descriptor for table "
619                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
620           } else if (!peerHtd.equals(htd)) {
621             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
622                 + " exists in peer cluster " + repPeer.getId()
623                 + ", but the table descriptors are not same when comapred with source cluster."
624                 + " Thus can not enable the table's replication switch.");
625           }
626         }
627       }
628     }
629   }
630 
631   private List<ReplicationPeer> listValidReplicationPeers() {
632     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
633     if (peers == null || peers.size() <= 0) {
634       return null;
635     }
636     List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
637     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
638       String peerId = peerEntry.getKey();
639       String clusterKey = peerEntry.getValue().getClusterKey();
640       Configuration peerConf = new Configuration(this.connection.getConfiguration());
641       Stat s = null;
642       try {
643         ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
644         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
645         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
646         s =
647             zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
648               null);
649         if (null == s) {
650           LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
651           continue;
652         }
653         validPeers.add(peer);
654       } catch (ReplicationException e) {
655         LOG.warn("Failed to get valid replication peers. "
656             + "Error connecting to peer cluster with peerId=" + peerId);
657         LOG.debug("Failure details to get valid replication peers.", e);
658         continue;
659       } catch (KeeperException e) {
660         LOG.warn("Failed to get valid replication peers. KeeperException code="
661             + e.code().intValue());
662         LOG.debug("Failure details to get valid replication peers.", e);
663         continue;
664       } catch (InterruptedException e) {
665         LOG.warn("Failed to get valid replication peers due to InterruptedException.");
666         LOG.debug("Failure details to get valid replication peers.", e);
667         continue;
668       } catch (IOException e) {
669         LOG.warn("Failed to get valid replication peers due to IOException.");
670         LOG.debug("Failure details to get valid replication peers.", e);
671         continue;
672       }
673     }
674     return validPeers;
675   }
676 
677   /**
678    * Set the table's replication switch if the table's replication switch is already not set.
679    * @param tableName name of the table
680    * @param isRepEnabled is replication switch enable or disable
681    * @throws IOException if a remote or network exception occurs
682    */
683   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
684     Admin admin = null;
685     try {
686       admin = this.connection.getAdmin();
687       HTableDescriptor htd = admin.getTableDescriptor(tableName);
688       if (isTableRepEnabled(htd) ^ isRepEnabled) {
689         boolean isOnlineSchemaUpdateEnabled =
690             this.connection.getConfiguration()
691                 .getBoolean("hbase.online.schema.update.enable", true);
692         if (!isOnlineSchemaUpdateEnabled) {
693           admin.disableTable(tableName);
694         }
695         for (HColumnDescriptor hcd : htd.getFamilies()) {
696           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
697               : HConstants.REPLICATION_SCOPE_LOCAL);
698         }
699         admin.modifyTable(tableName, htd);
700         if (!isOnlineSchemaUpdateEnabled) {
701           admin.enableTable(tableName);
702         }
703       }
704     } finally {
705       if (admin != null) {
706         try {
707           admin.close();
708         } catch (IOException e) {
709           LOG.warn("Failed to close admin connection.");
710           LOG.debug("Details on failure to close admin connection.", e);
711         }
712       }
713     }
714   }
715 
716   /**
717    * @param htd table descriptor details for the table to check
718    * @return true if table's replication switch is enabled
719    */
720   private boolean isTableRepEnabled(HTableDescriptor htd) {
721     for (HColumnDescriptor hcd : htd.getFamilies()) {
722       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
723         return false;
724       }
725     }
726     return true;
727   }
728 }