View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.TableNotFoundException;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.classification.InterfaceStability;
44  import org.apache.hadoop.hbase.client.Admin;
45  import org.apache.hadoop.hbase.client.Connection;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.RegionLocator;
48  import org.apache.hadoop.hbase.replication.ReplicationException;
49  import org.apache.hadoop.hbase.replication.ReplicationFactory;
50  import org.apache.hadoop.hbase.replication.ReplicationPeer;
51  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
52  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
53  import org.apache.hadoop.hbase.replication.ReplicationPeers;
54  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
55  import org.apache.hadoop.hbase.util.Pair;
56  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
57  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
58  import org.apache.zookeeper.KeeperException;
59  import org.apache.zookeeper.data.Stat;
60  
61  import com.google.common.annotations.VisibleForTesting;
62  import com.google.common.collect.Lists;
63  
64  /**
65   * <p>
66   * This class provides the administrative interface to HBase cluster
67   * replication. In order to use it, the cluster and the client using
68   * ReplicationAdmin must be configured with <code>hbase.replication</code>
69   * set to true.
70   * </p>
71   * <p>
72   * Adding a new peer results in creating new outbound connections from every
73   * region server to a subset of region servers on the slave cluster. Each
74   * new stream of replication will start replicating from the beginning of the
75   * current WAL, meaning that edits from that past will be replicated.
76   * </p>
77   * <p>
78   * Removing a peer is a destructive and irreversible operation that stops
79   * all the replication streams for the given cluster and deletes the metadata
80   * used to keep track of the replication state.
81   * </p>
82   * <p>
83   * To see which commands are available in the shell, type
84   * <code>replication</code>.
85   * </p>
86   */
87  @InterfaceAudience.Public
88  @InterfaceStability.Evolving
89  public class ReplicationAdmin implements Closeable {
90    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
91  
92    public static final String TNAME = "tableName";
93    public static final String CFNAME = "columnFamlyName";
94  
95    // only Global for now, can add other type
96    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
97    public static final String REPLICATIONTYPE = "replicationType";
98    public static final String REPLICATIONGLOBAL = Integer
99        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
100 
101   private final Connection connection;
102   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
103   // be moved to hbase-server. Resolve it in HBASE-11392.
104   private final ReplicationQueuesClient replicationQueuesClient;
105   private final ReplicationPeers replicationPeers;
106   /**
107    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
108    * on {@link #close()}.
109    */
110   private final ZooKeeperWatcher zkw;
111 
112   /**
113    * Constructor that creates a connection to the local ZooKeeper ensemble.
114    * @param conf Configuration to use
115    * @throws IOException if an internal replication error occurs
116    * @throws RuntimeException if replication isn't enabled.
117    */
118   public ReplicationAdmin(Configuration conf) throws IOException {
119     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
120         HConstants.REPLICATION_ENABLE_DEFAULT)) {
121       throw new RuntimeException("hbase.replication isn't true, please " +
122           "enable it in order to use replication");
123     }
124     this.connection = ConnectionFactory.createConnection(conf);
125     try {
126       zkw = createZooKeeperWatcher();
127       try {
128         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
129         this.replicationPeers.init();
130         this.replicationQueuesClient =
131             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
132         this.replicationQueuesClient.init();
133       } catch (Exception exception) {
134         if (zkw != null) {
135           zkw.close();
136         }
137         throw exception;
138       }
139     } catch (Exception exception) {
140       if (connection != null) {
141         connection.close();
142       }
143       if (exception instanceof IOException) {
144         throw (IOException) exception;
145       } else if (exception instanceof RuntimeException) {
146         throw (RuntimeException) exception;
147       } else {
148         throw new IOException("Error initializing the replication admin client.", exception);
149       }
150     }
151   }
152 
153   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
154     // This Abortable doesn't 'abort'... it just logs.
155     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
156       @Override
157       public void abort(String why, Throwable e) {
158         LOG.error(why, e);
159         // We used to call system.exit here but this script can be embedded by other programs that
160         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
161       }
162 
163       @Override
164       public boolean isAborted() {
165         return false;
166       }
167     });
168   }
169 
170   /**
171    * Add a new peer cluster to replicate to.
172    * @param id a short name that identifies the cluster
173    * @param clusterKey the concatenation of the slave cluster's
174    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
175    * @throws IllegalStateException if there's already one slave since
176    * multi-slave isn't supported yet.
177    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
178    */
179   @Deprecated
180   public void addPeer(String id, String clusterKey) throws ReplicationException {
181     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
182   }
183 
184   @Deprecated
185   public void addPeer(String id, String clusterKey, String tableCFs)
186     throws ReplicationException {
187     this.replicationPeers.addPeer(id,
188       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
189   }
190 
191   /**
192    * Add a new remote slave cluster for replication.
193    * @param id a short name that identifies the cluster
194    * @param peerConfig configuration for the replication slave cluster
195    * @param tableCfs the table and column-family list which will be replicated for this peer.
196    * A map from tableName to column family names. An empty collection can be passed
197    * to indicate replicating all column families. Pass null for replicating all table and column
198    * families
199    */
200   public void addPeer(String id, ReplicationPeerConfig peerConfig,
201       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
202     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
203   }
204 
205   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
206     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
207       return null;
208     }
209 
210     Map<TableName, List<String>> tableCFsMap = null;
211     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
212     // parse out (table, cf-list) pairs from tableCFsConfig
213     // format: "table1:cf1,cf2;table2:cfA,cfB"
214     String[] tables = tableCFsConfig.split(";");
215     for (String tab : tables) {
216       // 1 ignore empty table config
217       tab = tab.trim();
218       if (tab.length() == 0) {
219         continue;
220       }
221       // 2 split to "table" and "cf1,cf2"
222       //   for each table: "table:cf1,cf2" or "table"
223       String[] pair = tab.split(":");
224       String tabName = pair[0].trim();
225       if (pair.length > 2 || tabName.length() == 0) {
226         LOG.error("ignore invalid tableCFs setting: " + tab);
227         continue;
228       }
229 
230       // 3 parse "cf1,cf2" part to List<cf>
231       List<String> cfs = null;
232       if (pair.length == 2) {
233         String[] cfsList = pair[1].split(",");
234         for (String cf : cfsList) {
235           String cfName = cf.trim();
236           if (cfName.length() > 0) {
237             if (cfs == null) {
238               cfs = new ArrayList<String>();
239             }
240             cfs.add(cfName);
241           }
242         }
243       }
244 
245       // 4 put <table, List<cf>> to map
246       if (tableCFsMap == null) {
247         tableCFsMap = new HashMap<TableName, List<String>>();
248       }
249       tableCFsMap.put(TableName.valueOf(tabName), cfs);
250     }
251     return tableCFsMap;
252   }
253 
254   @VisibleForTesting
255   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
256     String tableCfsStr = null;
257     if (tableCfs != null) {
258       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
259       StringBuilder builder = new StringBuilder();
260       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
261         if (builder.length() > 0) {
262           builder.append(";");
263         }
264         builder.append(entry.getKey());
265         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
266           builder.append(":");
267           builder.append(StringUtils.join(entry.getValue(), ","));
268         }
269       }
270       tableCfsStr = builder.toString();
271     }
272     return tableCfsStr;
273   }
274 
275   /**
276    * Removes a peer cluster and stops the replication to it.
277    * @param id a short name that identifies the cluster
278    */
279   public void removePeer(String id) throws ReplicationException {
280     this.replicationPeers.removePeer(id);
281   }
282 
283   /**
284    * Restart the replication stream to the specified peer.
285    * @param id a short name that identifies the cluster
286    */
287   public void enablePeer(String id) throws ReplicationException {
288     this.replicationPeers.enablePeer(id);
289   }
290 
291   /**
292    * Stop the replication stream to the specified peer.
293    * @param id a short name that identifies the cluster
294    */
295   public void disablePeer(String id) throws ReplicationException {
296     this.replicationPeers.disablePeer(id);
297   }
298 
299   /**
300    * Get the number of slave clusters the local cluster has.
301    * @return number of slave clusters
302    */
303   public int getPeersCount() {
304     return this.replicationPeers.getAllPeerIds().size();
305   }
306 
307   /**
308    * Map of this cluster's peers for display.
309    * @return A map of peer ids to peer cluster keys
310    * @deprecated use {@link #listPeerConfigs()}
311    */
312   @Deprecated
313   public Map<String, String> listPeers() {
314     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
315     Map<String, String> ret = new HashMap<String, String>(peers.size());
316 
317     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
318       ret.put(entry.getKey(), entry.getValue().getClusterKey());
319     }
320     return ret;
321   }
322 
323   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
324     return this.replicationPeers.getAllPeerConfigs();
325   }
326 
327   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
328     return this.replicationPeers.getReplicationPeerConfig(id);
329   }
330 
331   /**
332    * Get the replicable table-cf config of the specified peer.
333    * @param id a short name that identifies the cluster
334    */
335   public String getPeerTableCFs(String id) throws ReplicationException {
336     return this.replicationPeers.getPeerTableCFsConfig(id);
337   }
338 
339   /**
340    * Set the replicable table-cf config of the specified peer
341    * @param id a short name that identifies the cluster
342    * @deprecated use {@link #setPeerTableCFs(String, Map)}
343    */
344   @Deprecated
345   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
346     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
347   }
348 
349   /**
350    * Append the replicable table-cf config of the specified peer
351    * @param id a short that identifies the cluster
352    * @param tableCfs table-cfs config str
353    * @throws ReplicationException
354    */
355   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
356     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
357   }
358 
359   /**
360    * Append the replicable table-cf config of the specified peer
361    * @param id a short that identifies the cluster
362    * @param tableCfs A map from tableName to column family names
363    * @throws ReplicationException
364    */
365   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
366       throws ReplicationException {
367     if (tableCfs == null) {
368       throw new ReplicationException("tableCfs is null");
369     }
370     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
371     if (preTableCfs == null) {
372       setPeerTableCFs(id, tableCfs);
373       return;
374     }
375 
376     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
377       TableName table = entry.getKey();
378       Collection<String> appendCfs = entry.getValue();
379       if (preTableCfs.containsKey(table)) {
380         List<String> cfs = preTableCfs.get(table);
381         if (cfs == null || appendCfs == null) {
382           preTableCfs.put(table, null);
383         } else {
384           Set<String> cfSet = new HashSet<String>(cfs);
385           cfSet.addAll(appendCfs);
386           preTableCfs.put(table, Lists.newArrayList(cfSet));
387         }
388       } else {
389         if (appendCfs == null || appendCfs.isEmpty()) {
390           preTableCfs.put(table, null);
391         } else {
392           preTableCfs.put(table, Lists.newArrayList(appendCfs));
393         }
394       }
395     }
396     setPeerTableCFs(id, preTableCfs);
397   }
398 
399   /**
400    * Remove some table-cfs from table-cfs config of the specified peer
401    * @param id a short name that identifies the cluster
402    * @param tableCf table-cfs config str
403    * @throws ReplicationException
404    */
405   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
406     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
407   }
408 
409   /**
410    * Remove some table-cfs from config of the specified peer
411    * @param id a short name that identifies the cluster
412    * @param tableCfs A map from tableName to column family names
413    * @throws ReplicationException
414    */
415   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
416       throws ReplicationException {
417     if (tableCfs == null) {
418       throw new ReplicationException("tableCfs is null");
419     }
420 
421     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
422     if (preTableCfs == null) {
423       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
424     }
425     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
426       TableName table = entry.getKey();
427       Collection<String> removeCfs = entry.getValue();
428       if (preTableCfs.containsKey(table)) {
429         List<String> cfs = preTableCfs.get(table);
430         if (cfs == null && removeCfs == null) {
431           preTableCfs.remove(table);
432         } else if (cfs != null && removeCfs != null) {
433           Set<String> cfSet = new HashSet<String>(cfs);
434           cfSet.removeAll(removeCfs);
435           if (cfSet.isEmpty()) {
436             preTableCfs.remove(table);
437           } else {
438             preTableCfs.put(table, Lists.newArrayList(cfSet));
439           }
440         } else if (cfs == null && removeCfs != null) {
441           throw new ReplicationException("Cannot remove cf of table: " + table
442               + " which doesn't specify cfs from table-cfs config in peer: " + id);
443         } else if (cfs != null && removeCfs == null) {
444           throw new ReplicationException("Cannot remove table: " + table
445               + " which has specified cfs from table-cfs config in peer: " + id);
446         }
447       } else {
448         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
449       }
450     }
451     setPeerTableCFs(id, preTableCfs);
452   }
453 
454   /**
455    * Set the replicable table-cf config of the specified peer
456    * @param id a short name that identifies the cluster
457    * @param tableCfs the table and column-family list which will be replicated for this peer.
458    * A map from tableName to column family names. An empty collection can be passed
459    * to indicate replicating all column families. Pass null for replicating all table and column
460    * families
461    */
462   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
463       throws ReplicationException {
464     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
465   }
466 
467   /**
468    * Get the state of the specified peer cluster
469    * @param id String format of the Short name that identifies the peer,
470    * an IllegalArgumentException is thrown if it doesn't exist
471    * @return true if replication is enabled to that peer, false if it isn't
472    */
473   public boolean getPeerState(String id) throws ReplicationException {
474     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
475   }
476 
477   @Override
478   public void close() throws IOException {
479     if (this.zkw != null) {
480       this.zkw.close();
481     }
482     if (this.connection != null) {
483       this.connection.close();
484     }
485   }
486 
487 
488   /**
489    * Find all column families that are replicated from this cluster
490    * @return the full list of the replicated column families of this cluster as:
491    *        tableName, family name, replicationType
492    *
493    * Currently replicationType is Global. In the future, more replication
494    * types may be extended here. For example
495    *  1) the replication may only apply to selected peers instead of all peers
496    *  2) the replicationType may indicate the host Cluster servers as Slave
497    *     for the table:columnFam.
498    */
499   public List<HashMap<String, String>> listReplicated() throws IOException {
500     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
501 
502     Admin admin = connection.getAdmin();
503     HTableDescriptor[] tables;
504     try {
505       tables = admin.listTables();
506     } finally {
507       if (admin!= null) admin.close();
508     }
509 
510     for (HTableDescriptor table : tables) {
511       HColumnDescriptor[] columns = table.getColumnFamilies();
512       String tableName = table.getNameAsString();
513       for (HColumnDescriptor column : columns) {
514         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
515           // At this moment, the columfam is replicated to all peers
516           HashMap<String, String> replicationEntry = new HashMap<String, String>();
517           replicationEntry.put(TNAME, tableName);
518           replicationEntry.put(CFNAME, column.getNameAsString());
519           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
520           replicationColFams.add(replicationEntry);
521         }
522       }
523     }
524 
525     return replicationColFams;
526   }
527 
528   /**
529    * Enable a table's replication switch.
530    * @param tableName name of the table
531    * @throws IOException if a remote or network exception occurs
532    */
533   public void enableTableRep(final TableName tableName) throws IOException {
534     if (tableName == null) {
535       throw new IllegalArgumentException("Table name cannot be null");
536     }
537     try (Admin admin = this.connection.getAdmin()) {
538       if (!admin.tableExists(tableName)) {
539         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
540             + "' does not exists.");
541       }
542     }
543     byte[][] splits = getTableSplitRowKeys(tableName);
544     checkAndSyncTableDescToPeers(tableName, splits);
545     setTableRep(tableName, true);
546   }
547 
548   /**
549    * Disable a table's replication switch.
550    * @param tableName name of the table
551    * @throws IOException if a remote or network exception occurs
552    */
553   public void disableTableRep(final TableName tableName) throws IOException {
554     if (tableName == null) {
555       throw new IllegalArgumentException("Table name is null");
556     }
557     try (Admin admin = this.connection.getAdmin()) {
558       if (!admin.tableExists(tableName)) {
559         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
560             + "' does not exists.");
561       }
562     }
563     setTableRep(tableName, false);
564   }
565 
566   /**
567    * Get the split row keys of table
568    * @param tableName table name
569    * @return array of split row keys
570    * @throws IOException
571    */
572   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
573     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
574       byte[][] startKeys = locator.getStartKeys();
575       if (startKeys.length == 1) {
576         return null;
577       }
578       byte[][] splits = new byte[startKeys.length - 1][];
579       for (int i = 1; i < startKeys.length; i++) {
580         splits[i - 1] = startKeys[i];
581       }
582       return splits;
583     }
584   }
585 
586   /**
587    * Connect to peer and check the table descriptor on peer:
588    * <ol>
589    * <li>Create the same table on peer when not exist.</li>
590    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
591    * </ol>
592    * @param tableName name of the table to sync to the peer
593    * @param splits table split keys
594    * @throws IOException
595    */
596   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
597       throws IOException {
598     List<ReplicationPeer> repPeers = listValidReplicationPeers();
599     if (repPeers == null || repPeers.size() <= 0) {
600       throw new IllegalArgumentException("Found no peer cluster for replication.");
601     }
602     for (ReplicationPeer repPeer : repPeers) {
603       Configuration peerConf = repPeer.getConfiguration();
604       HTableDescriptor htd = null;
605       try (Connection conn = ConnectionFactory.createConnection(peerConf);
606           Admin admin = this.connection.getAdmin();
607           Admin repHBaseAdmin = conn.getAdmin()) {
608         htd = admin.getTableDescriptor(tableName);
609         HTableDescriptor peerHtd = null;
610         if (!repHBaseAdmin.tableExists(tableName)) {
611           repHBaseAdmin.createTable(htd, splits);
612         } else {
613           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
614           if (peerHtd == null) {
615             throw new IllegalArgumentException("Failed to get table descriptor for table "
616                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
617           } else if (!peerHtd.equals(htd)) {
618             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
619                 + " exists in peer cluster " + repPeer.getId()
620                 + ", but the table descriptors are not same when comapred with source cluster."
621                 + " Thus can not enable the table's replication switch.");
622           }
623         }
624       }
625     }
626   }
627 
628   private List<ReplicationPeer> listValidReplicationPeers() {
629     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
630     if (peers == null || peers.size() <= 0) {
631       return null;
632     }
633     List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
634     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
635       String peerId = peerEntry.getKey();
636       String clusterKey = peerEntry.getValue().getClusterKey();
637       Configuration peerConf = new Configuration(this.connection.getConfiguration());
638       Stat s = null;
639       try {
640         ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
641         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
642         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
643         s =
644             zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
645               null);
646         if (null == s) {
647           LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
648           continue;
649         }
650         validPeers.add(peer);
651       } catch (ReplicationException e) {
652         LOG.warn("Failed to get valid replication peers. "
653             + "Error connecting to peer cluster with peerId=" + peerId);
654         LOG.debug("Failure details to get valid replication peers.", e);
655         continue;
656       } catch (KeeperException e) {
657         LOG.warn("Failed to get valid replication peers. KeeperException code="
658             + e.code().intValue());
659         LOG.debug("Failure details to get valid replication peers.", e);
660         continue;
661       } catch (InterruptedException e) {
662         LOG.warn("Failed to get valid replication peers due to InterruptedException.");
663         LOG.debug("Failure details to get valid replication peers.", e);
664         Thread.currentThread().interrupt();
665         continue;
666       } catch (IOException e) {
667         LOG.warn("Failed to get valid replication peers due to IOException.");
668         LOG.debug("Failure details to get valid replication peers.", e);
669         continue;
670       }
671     }
672     return validPeers;
673   }
674 
675   /**
676    * Set the table's replication switch if the table's replication switch is already not set.
677    * @param tableName name of the table
678    * @param isRepEnabled is replication switch enable or disable
679    * @throws IOException if a remote or network exception occurs
680    */
681   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
682     Admin admin = null;
683     try {
684       admin = this.connection.getAdmin();
685       HTableDescriptor htd = admin.getTableDescriptor(tableName);
686       if (isTableRepEnabled(htd) ^ isRepEnabled) {
687         boolean isOnlineSchemaUpdateEnabled =
688             this.connection.getConfiguration()
689                 .getBoolean("hbase.online.schema.update.enable", true);
690         if (!isOnlineSchemaUpdateEnabled) {
691           admin.disableTable(tableName);
692         }
693         for (HColumnDescriptor hcd : htd.getFamilies()) {
694           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
695               : HConstants.REPLICATION_SCOPE_LOCAL);
696         }
697         admin.modifyTable(tableName, htd);
698         if (!isOnlineSchemaUpdateEnabled) {
699           admin.enableTable(tableName);
700         }
701       }
702     } finally {
703       if (admin != null) {
704         try {
705           admin.close();
706         } catch (IOException e) {
707           LOG.warn("Failed to close admin connection.");
708           LOG.debug("Details on failure to close admin connection.", e);
709         }
710       }
711     }
712   }
713 
714   /**
715    * @param htd table descriptor details for the table to check
716    * @return true if table's replication switch is enabled
717    */
718   private boolean isTableRepEnabled(HTableDescriptor htd) {
719     for (HColumnDescriptor hcd : htd.getFamilies()) {
720       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
721         return false;
722       }
723     }
724     return true;
725   }
726 }