View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.TableNotFoundException;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.hbase.classification.InterfaceStability;
44  import org.apache.hadoop.hbase.client.Admin;
45  import org.apache.hadoop.hbase.client.Connection;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.RegionLocator;
48  import org.apache.hadoop.hbase.replication.ReplicationException;
49  import org.apache.hadoop.hbase.replication.ReplicationFactory;
50  import org.apache.hadoop.hbase.replication.ReplicationPeer;
51  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
52  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
53  import org.apache.hadoop.hbase.replication.ReplicationPeers;
54  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
55  import org.apache.hadoop.hbase.util.Pair;
56  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57  import org.apache.zookeeper.KeeperException;
58  import org.apache.zookeeper.data.Stat;
59  
60  import com.google.common.annotations.VisibleForTesting;
61  import com.google.common.collect.Lists;
62  
63  /**
64   * <p>
65   * This class provides the administrative interface to HBase cluster
66   * replication. In order to use it, the cluster and the client using
67   * ReplicationAdmin must be configured with <code>hbase.replication</code>
68   * set to true.
69   * </p>
70   * <p>
71   * Adding a new peer results in creating new outbound connections from every
72   * region server to a subset of region servers on the slave cluster. Each
73   * new stream of replication will start replicating from the beginning of the
74   * current WAL, meaning that edits from that past will be replicated.
75   * </p>
76   * <p>
77   * Removing a peer is a destructive and irreversible operation that stops
78   * all the replication streams for the given cluster and deletes the metadata
79   * used to keep track of the replication state.
80   * </p>
81   * <p>
82   * To see which commands are available in the shell, type
83   * <code>replication</code>.
84   * </p>
85   */
86  @InterfaceAudience.Public
87  @InterfaceStability.Evolving
88  public class ReplicationAdmin implements Closeable {
89    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
90  
91    public static final String TNAME = "tableName";
92    public static final String CFNAME = "columnFamilyName";
93  
94    // only Global for now, can add other type
95    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
96    public static final String REPLICATIONTYPE = "replicationType";
97    public static final String REPLICATIONGLOBAL = Integer
98        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
99  
100   private final Connection connection;
101   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
102   // be moved to hbase-server. Resolve it in HBASE-11392.
103   private final ReplicationQueuesClient replicationQueuesClient;
104   private final ReplicationPeers replicationPeers;
105   /**
106    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
107    * on {@link #close()}.
108    */
109   private final ZooKeeperWatcher zkw;
110 
111   /**
112    * Constructor that creates a connection to the local ZooKeeper ensemble.
113    * @param conf Configuration to use
114    * @throws IOException if an internal replication error occurs
115    * @throws RuntimeException if replication isn't enabled.
116    */
117   public ReplicationAdmin(Configuration conf) throws IOException {
118     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
119         HConstants.REPLICATION_ENABLE_DEFAULT)) {
120       throw new RuntimeException("hbase.replication isn't true, please " +
121           "enable it in order to use replication");
122     }
123     this.connection = ConnectionFactory.createConnection(conf);
124     try {
125       zkw = createZooKeeperWatcher();
126       try {
127         this.replicationQueuesClient =
128             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
129         this.replicationQueuesClient.init();
130         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
131           this.replicationQueuesClient, this.connection);
132         this.replicationPeers.init();
133       } catch (Exception exception) {
134         if (zkw != null) {
135           zkw.close();
136         }
137         throw exception;
138       }
139     } catch (Exception exception) {
140       if (connection != null) {
141         connection.close();
142       }
143       if (exception instanceof IOException) {
144         throw (IOException) exception;
145       } else if (exception instanceof RuntimeException) {
146         throw (RuntimeException) exception;
147       } else {
148         throw new IOException("Error initializing the replication admin client.", exception);
149       }
150     }
151   }
152 
153   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
154     // This Abortable doesn't 'abort'... it just logs.
155     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
156       @Override
157       public void abort(String why, Throwable e) {
158         LOG.error(why, e);
159         // We used to call system.exit here but this script can be embedded by other programs that
160         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
161       }
162 
163       @Override
164       public boolean isAborted() {
165         return false;
166       }
167     });
168   }
169 
170   /**
171    * Add a new peer cluster to replicate to.
172    * @param id a short name that identifies the cluster
173    * @param clusterKey the concatenation of the slave cluster's
174    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
175    * @throws IllegalStateException if there's already one slave since
176    * multi-slave isn't supported yet.
177    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
178    */
179   @Deprecated
180   public void addPeer(String id, String clusterKey) throws ReplicationException {
181     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
182   }
183 
184   @Deprecated
185   public void addPeer(String id, String clusterKey, String tableCFs)
186     throws ReplicationException {
187     this.replicationPeers.addPeer(id,
188       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
189   }
190   
191   /**
192    * Add a new remote slave cluster for replication.
193    * @param id a short name that identifies the cluster
194    * @param peerConfig configuration for the replication slave cluster
195    * @param tableCfs the table and column-family list which will be replicated for this peer.
196    * A map from tableName to column family names. An empty collection can be passed
197    * to indicate replicating all column families. Pass null for replicating all table and column
198    * families
199    */
200   public void addPeer(String id, ReplicationPeerConfig peerConfig,
201       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
202     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
203   }
204 
205   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
206     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
207       return null;
208     }
209 
210     Map<TableName, List<String>> tableCFsMap = null;
211     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
212     // parse out (table, cf-list) pairs from tableCFsConfig
213     // format: "table1:cf1,cf2;table2:cfA,cfB"
214     String[] tables = tableCFsConfig.split(";");
215     for (String tab : tables) {
216       // 1 ignore empty table config
217       tab = tab.trim();
218       if (tab.length() == 0) {
219         continue;
220       }
221       // 2 split to "table" and "cf1,cf2"
222       //   for each table: "table:cf1,cf2" or "table"
223       String[] pair = tab.split(":");
224       String tabName = pair[0].trim();
225       if (pair.length > 2 || tabName.length() == 0) {
226         LOG.error("ignore invalid tableCFs setting: " + tab);
227         continue;
228       }
229 
230       // 3 parse "cf1,cf2" part to List<cf>
231       List<String> cfs = null;
232       if (pair.length == 2) {
233         String[] cfsList = pair[1].split(",");
234         for (String cf : cfsList) {
235           String cfName = cf.trim();
236           if (cfName.length() > 0) {
237             if (cfs == null) {
238               cfs = new ArrayList<String>();
239             }
240             cfs.add(cfName);
241           }
242         }
243       }
244 
245       // 4 put <table, List<cf>> to map
246       if (tableCFsMap == null) {
247         tableCFsMap = new HashMap<TableName, List<String>>();
248       }
249       tableCFsMap.put(TableName.valueOf(tabName), cfs);
250     }
251     return tableCFsMap;
252   }
253 
254   @VisibleForTesting
255   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
256     String tableCfsStr = null;
257     if (tableCfs != null) {
258       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
259       StringBuilder builder = new StringBuilder();
260       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
261         if (builder.length() > 0) {
262           builder.append(";");
263         }
264         builder.append(entry.getKey());
265         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
266           builder.append(":");
267           builder.append(StringUtils.join(entry.getValue(), ","));
268         }
269       }
270       tableCfsStr = builder.toString();
271     }
272     return tableCfsStr;
273   }
274 
275   /**
276    * Removes a peer cluster and stops the replication to it.
277    * @param id a short name that identifies the cluster
278    */
279   public void removePeer(String id) throws ReplicationException {
280     this.replicationPeers.removePeer(id);
281   }
282 
283   /**
284    * Restart the replication stream to the specified peer.
285    * @param id a short name that identifies the cluster
286    */
287   public void enablePeer(String id) throws ReplicationException {
288     this.replicationPeers.enablePeer(id);
289   }
290 
291   /**
292    * Stop the replication stream to the specified peer.
293    * @param id a short name that identifies the cluster
294    */
295   public void disablePeer(String id) throws ReplicationException {
296     this.replicationPeers.disablePeer(id);
297   }
298 
299   /**
300    * Get the number of slave clusters the local cluster has.
301    * @return number of slave clusters
302    */
303   public int getPeersCount() {
304     return this.replicationPeers.getAllPeerIds().size();
305   }
306 
307   /**
308    * Map of this cluster's peers for display.
309    * @return A map of peer ids to peer cluster keys
310    * @deprecated use {@link #listPeerConfigs()}
311    */
312   @Deprecated
313   public Map<String, String> listPeers() {
314     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
315     Map<String, String> ret = new HashMap<String, String>(peers.size());
316 
317     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
318       ret.put(entry.getKey(), entry.getValue().getClusterKey());
319     }
320     return ret;
321   }
322 
323   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
324     return this.replicationPeers.getAllPeerConfigs();
325   }
326 
327   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
328     return this.replicationPeers.getReplicationPeerConfig(id);
329   }
330 
331   /**
332    * Get the replicable table-cf config of the specified peer.
333    * @param id a short name that identifies the cluster
334    */
335   public String getPeerTableCFs(String id) throws ReplicationException {
336     return this.replicationPeers.getPeerTableCFsConfig(id);
337   }
338 
339   /**
340    * Set the replicable table-cf config of the specified peer
341    * @param id a short name that identifies the cluster
342    * @deprecated use {@link #setPeerTableCFs(String, Map)}
343    */
344   @Deprecated
345   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
346     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
347   }
348 
349   /**
350    * Append the replicable table-cf config of the specified peer
351    * @param id a short that identifies the cluster
352    * @param tableCfs table-cfs config str
353    * @throws ReplicationException
354    */
355   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
356     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
357   }
358 
359   /**
360    * Append the replicable table-cf config of the specified peer
361    * @param id a short that identifies the cluster
362    * @param tableCfs A map from tableName to column family names
363    * @throws ReplicationException
364    */
365   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
366       throws ReplicationException {
367     if (tableCfs == null) {
368       throw new ReplicationException("tableCfs is null");
369     }
370     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
371     if (preTableCfs == null) {
372       setPeerTableCFs(id, tableCfs);
373       return;
374     }
375 
376     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
377       TableName table = entry.getKey();
378       Collection<String> appendCfs = entry.getValue();
379       if (preTableCfs.containsKey(table)) {
380         List<String> cfs = preTableCfs.get(table);
381         if (cfs == null || appendCfs == null) {
382           preTableCfs.put(table, null);
383         } else {
384           Set<String> cfSet = new HashSet<String>(cfs);
385           cfSet.addAll(appendCfs);
386           preTableCfs.put(table, Lists.newArrayList(cfSet));
387         }
388       } else {
389         if (appendCfs == null || appendCfs.isEmpty()) {
390           preTableCfs.put(table, null);
391         } else {
392           preTableCfs.put(table, Lists.newArrayList(appendCfs));
393         }
394       }
395     }
396     setPeerTableCFs(id, preTableCfs);
397   }
398 
399   /**
400    * Remove some table-cfs from table-cfs config of the specified peer
401    * @param id a short name that identifies the cluster
402    * @param tableCf table-cfs config str
403    * @throws ReplicationException
404    */
405   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
406     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
407   }
408 
409   /**
410    * Remove some table-cfs from config of the specified peer
411    * @param id a short name that identifies the cluster
412    * @param tableCfs A map from tableName to column family names
413    * @throws ReplicationException
414    */
415   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
416       throws ReplicationException {
417     if (tableCfs == null) {
418       throw new ReplicationException("tableCfs is null");
419     }
420 
421     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
422     if (preTableCfs == null) {
423       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
424     }
425     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
426       TableName table = entry.getKey();
427       Collection<String> removeCfs = entry.getValue();
428       if (preTableCfs.containsKey(table)) {
429         List<String> cfs = preTableCfs.get(table);
430         if (cfs == null && removeCfs == null) {
431           preTableCfs.remove(table);
432         } else if (cfs != null && removeCfs != null) {
433           Set<String> cfSet = new HashSet<String>(cfs);
434           cfSet.removeAll(removeCfs);
435           if (cfSet.isEmpty()) {
436             preTableCfs.remove(table);
437           } else {
438             preTableCfs.put(table, Lists.newArrayList(cfSet));
439           }
440         } else if (cfs == null && removeCfs != null) {
441           throw new ReplicationException("Cannot remove cf of table: " + table
442               + " which doesn't specify cfs from table-cfs config in peer: " + id);
443         } else if (cfs != null && removeCfs == null) {
444           throw new ReplicationException("Cannot remove table: " + table
445               + " which has specified cfs from table-cfs config in peer: " + id);
446         }
447       } else {
448         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
449       }
450     }
451     setPeerTableCFs(id, preTableCfs);
452   }
453 
454   /**
455    * Set the replicable table-cf config of the specified peer
456    * @param id a short name that identifies the cluster
457    * @param tableCfs the table and column-family list which will be replicated for this peer.
458    * A map from tableName to column family names. An empty collection can be passed
459    * to indicate replicating all column families. Pass null for replicating all table and column
460    * families
461    */
462   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
463       throws ReplicationException {
464     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
465   }
466 
467   /**
468    * Get the state of the specified peer cluster
469    * @param id String format of the Short name that identifies the peer,
470    * an IllegalArgumentException is thrown if it doesn't exist
471    * @return true if replication is enabled to that peer, false if it isn't
472    */
473   public boolean getPeerState(String id) throws ReplicationException {
474     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
475   }
476 
477   @Override
478   public void close() throws IOException {
479     if (this.zkw != null) {
480       this.zkw.close();
481     }
482     if (this.connection != null) {
483       this.connection.close();
484     }
485   }
486 
487 
488   /**
489    * Find all column families that are replicated from this cluster
490    * @return the full list of the replicated column families of this cluster as:
491    *        tableName, family name, replicationType
492    *
493    * Currently replicationType is Global. In the future, more replication
494    * types may be extended here. For example
495    *  1) the replication may only apply to selected peers instead of all peers
496    *  2) the replicationType may indicate the host Cluster servers as Slave
497    *     for the table:columnFam.
498    */
499   public List<HashMap<String, String>> listReplicated() throws IOException {
500     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
501 
502     Admin admin = connection.getAdmin();
503     HTableDescriptor[] tables;
504     try {
505       tables = admin.listTables();
506     } finally {
507       if (admin!= null) admin.close();
508     }
509 
510     for (HTableDescriptor table : tables) {
511       HColumnDescriptor[] columns = table.getColumnFamilies();
512       String tableName = table.getNameAsString();
513       for (HColumnDescriptor column : columns) {
514         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
515           // At this moment, the columfam is replicated to all peers
516           HashMap<String, String> replicationEntry = new HashMap<String, String>();
517           replicationEntry.put(TNAME, tableName);
518           replicationEntry.put(CFNAME, column.getNameAsString());
519           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
520           replicationColFams.add(replicationEntry);
521         }
522       }
523     }
524 
525     return replicationColFams;
526   }
527 
528   /**
529    * Enable a table's replication switch.
530    * @param tableName name of the table
531    * @throws IOException if a remote or network exception occurs
532    */
533   public void enableTableRep(final TableName tableName) throws IOException {
534     if (tableName == null) {
535       throw new IllegalArgumentException("Table name cannot be null");
536     }
537     try (Admin admin = this.connection.getAdmin()) {
538       if (!admin.tableExists(tableName)) {
539         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
540             + "' does not exists.");
541       }
542     }
543     byte[][] splits = getTableSplitRowKeys(tableName);
544     checkAndSyncTableDescToPeers(tableName, splits);
545     setTableRep(tableName, true);
546   }
547 
548   /**
549    * Disable a table's replication switch.
550    * @param tableName name of the table
551    * @throws IOException if a remote or network exception occurs
552    */
553   public void disableTableRep(final TableName tableName) throws IOException {
554     if (tableName == null) {
555       throw new IllegalArgumentException("Table name is null");
556     }
557     try (Admin admin = this.connection.getAdmin()) {
558       if (!admin.tableExists(tableName)) {
559         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
560             + "' does not exists.");
561       }
562     }
563     setTableRep(tableName, false);
564   }
565 
566   /**
567    * Get the split row keys of table
568    * @param tableName table name
569    * @return array of split row keys
570    * @throws IOException
571    */
572   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
573     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
574       byte[][] startKeys = locator.getStartKeys();
575       if (startKeys.length == 1) {
576         return null;
577       }
578       byte[][] splits = new byte[startKeys.length - 1][];
579       for (int i = 1; i < startKeys.length; i++) {
580         splits[i - 1] = startKeys[i];
581       }
582       return splits;
583     }
584   }
585 
586   /**
587    * Connect to peer and check the table descriptor on peer:
588    * <ol>
589    * <li>Create the same table on peer when not exist.</li>
590    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
591    * </ol>
592    * @param tableName name of the table to sync to the peer
593    * @param splits table split keys
594    * @throws IOException
595    */
596   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
597       throws IOException {
598     List<ReplicationPeer> repPeers = listValidReplicationPeers();
599     if (repPeers == null || repPeers.size() <= 0) {
600       throw new IllegalArgumentException("Found no peer cluster for replication.");
601     }
602     
603     final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
604     
605     for (ReplicationPeer repPeer : repPeers) {
606       Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
607       // TODO Currently peer TableCFs will not include namespace so we need to check only for table
608       // name without namespace in it. Need to correct this logic once we fix HBASE-11386.
609       if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
610         continue;
611       }
612 
613       Configuration peerConf = repPeer.getConfiguration();
614       HTableDescriptor htd = null;
615       try (Connection conn = ConnectionFactory.createConnection(peerConf);
616           Admin admin = this.connection.getAdmin();
617           Admin repHBaseAdmin = conn.getAdmin()) {
618         htd = admin.getTableDescriptor(tableName);
619         HTableDescriptor peerHtd = null;
620         if (!repHBaseAdmin.tableExists(tableName)) {
621           repHBaseAdmin.createTable(htd, splits);
622         } else {
623           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
624           if (peerHtd == null) {
625             throw new IllegalArgumentException("Failed to get table descriptor for table "
626                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
627           } else if (!peerHtd.equals(htd)) {
628             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
629                 + " exists in peer cluster " + repPeer.getId()
630                 + ", but the table descriptors are not same when comapred with source cluster."
631                 + " Thus can not enable the table's replication switch.");
632           }
633         }
634       }
635     }
636   }
637 
638   @VisibleForTesting
639   List<ReplicationPeer> listValidReplicationPeers() {
640     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
641     if (peers == null || peers.size() <= 0) {
642       return null;
643     }
644     List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
645     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
646       String peerId = peerEntry.getKey();
647       Stat s = null;
648       try {
649         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
650         Configuration peerConf = pair.getSecond();
651         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
652             parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
653         s =
654             zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
655               null);
656         if (null == s) {
657           LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
658           continue;
659         }
660         validPeers.add(peer);
661       } catch (ReplicationException e) {
662         LOG.warn("Failed to get valid replication peers. "
663             + "Error connecting to peer cluster with peerId=" + peerId);
664         LOG.debug("Failure details to get valid replication peers.", e);
665         continue;
666       } catch (KeeperException e) {
667         LOG.warn("Failed to get valid replication peers. KeeperException code="
668             + e.code().intValue());
669         LOG.debug("Failure details to get valid replication peers.", e);
670         continue;
671       } catch (InterruptedException e) {
672         LOG.warn("Failed to get valid replication peers due to InterruptedException.");
673         LOG.debug("Failure details to get valid replication peers.", e);
674         Thread.currentThread().interrupt();
675         continue;
676       }
677     }
678     return validPeers;
679   }
680 
681   /**
682    * Set the table's replication switch if the table's replication switch is already not set.
683    * @param tableName name of the table
684    * @param isRepEnabled is replication switch enable or disable
685    * @throws IOException if a remote or network exception occurs
686    */
687   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
688     Admin admin = null;
689     try {
690       admin = this.connection.getAdmin();
691       HTableDescriptor htd = admin.getTableDescriptor(tableName);
692       if (isTableRepEnabled(htd) ^ isRepEnabled) {
693         boolean isOnlineSchemaUpdateEnabled =
694             this.connection.getConfiguration()
695                 .getBoolean("hbase.online.schema.update.enable", true);
696         if (!isOnlineSchemaUpdateEnabled) {
697           admin.disableTable(tableName);
698         }
699         for (HColumnDescriptor hcd : htd.getFamilies()) {
700           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
701               : HConstants.REPLICATION_SCOPE_LOCAL);
702         }
703         admin.modifyTable(tableName, htd);
704         if (!isOnlineSchemaUpdateEnabled) {
705           admin.enableTable(tableName);
706         }
707       }
708     } finally {
709       if (admin != null) {
710         try {
711           admin.close();
712         } catch (IOException e) {
713           LOG.warn("Failed to close admin connection.");
714           LOG.debug("Details on failure to close admin connection.", e);
715         }
716       }
717     }
718   }
719 
720   /**
721    * @param htd table descriptor details for the table to check
722    * @return true if table's replication switch is enabled
723    */
724   private boolean isTableRepEnabled(HTableDescriptor htd) {
725     for (HColumnDescriptor hcd : htd.getFamilies()) {
726       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
727         return false;
728       }
729     }
730     return true;
731   }
732 }