View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Abortable;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.TableNotFoundException;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.classification.InterfaceStability;
46  import org.apache.hadoop.hbase.client.Admin;
47  import org.apache.hadoop.hbase.client.Connection;
48  import org.apache.hadoop.hbase.client.ConnectionFactory;
49  import org.apache.hadoop.hbase.client.RegionLocator;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationFactory;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer;
53  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
57  import org.apache.hadoop.hbase.util.Pair;
58  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
59  
60  /**
61   * <p>
62   * This class provides the administrative interface to HBase cluster
63   * replication. In order to use it, the cluster and the client using
64   * ReplicationAdmin must be configured with <code>hbase.replication</code>
65   * set to true.
66   * </p>
67   * <p>
68   * Adding a new peer results in creating new outbound connections from every
69   * region server to a subset of region servers on the slave cluster. Each
70   * new stream of replication will start replicating from the beginning of the
71   * current WAL, meaning that edits from that past will be replicated.
72   * </p>
73   * <p>
74   * Removing a peer is a destructive and irreversible operation that stops
75   * all the replication streams for the given cluster and deletes the metadata
76   * used to keep track of the replication state.
77   * </p>
78   * <p>
79   * To see which commands are available in the shell, type
80   * <code>replication</code>.
81   * </p>
82   */
83  @InterfaceAudience.Public
84  @InterfaceStability.Evolving
85  public class ReplicationAdmin implements Closeable {
86    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
87  
88    public static final String TNAME = "tableName";
89    public static final String CFNAME = "columnFamilyName";
90  
91    // only Global for now, can add other type
92    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
93    public static final String REPLICATIONTYPE = "replicationType";
94    public static final String REPLICATIONGLOBAL = Integer
95        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
96  
97    private final Connection connection;
98    // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
99    // be moved to hbase-server. Resolve it in HBASE-11392.
100   private final ReplicationQueuesClient replicationQueuesClient;
101   private final ReplicationPeers replicationPeers;
102   /**
103    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
104    * on {@link #close()}.
105    */
106   private final ZooKeeperWatcher zkw;
107 
108   /**
109    * Constructor that creates a connection to the local ZooKeeper ensemble.
110    * @param conf Configuration to use
111    * @throws IOException if an internal replication error occurs
112    * @throws RuntimeException if replication isn't enabled.
113    */
114   public ReplicationAdmin(Configuration conf) throws IOException {
115     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
116         HConstants.REPLICATION_ENABLE_DEFAULT)) {
117       throw new RuntimeException("hbase.replication isn't true, please " +
118           "enable it in order to use replication");
119     }
120     this.connection = ConnectionFactory.createConnection(conf);
121     try {
122       zkw = createZooKeeperWatcher();
123       try {
124         this.replicationQueuesClient =
125             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
126         this.replicationQueuesClient.init();
127         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
128           this.replicationQueuesClient, this.connection);
129         this.replicationPeers.init();
130       } catch (Exception exception) {
131         if (zkw != null) {
132           zkw.close();
133         }
134         throw exception;
135       }
136     } catch (Exception exception) {
137       if (connection != null) {
138         connection.close();
139       }
140       if (exception instanceof IOException) {
141         throw (IOException) exception;
142       } else if (exception instanceof RuntimeException) {
143         throw (RuntimeException) exception;
144       } else {
145         throw new IOException("Error initializing the replication admin client.", exception);
146       }
147     }
148   }
149 
150   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
151     // This Abortable doesn't 'abort'... it just logs.
152     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
153       @Override
154       public void abort(String why, Throwable e) {
155         LOG.error(why, e);
156         // We used to call system.exit here but this script can be embedded by other programs that
157         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
158       }
159 
160       @Override
161       public boolean isAborted() {
162         return false;
163       }
164     });
165   }
166 
167   /**
168    * Add a new remote slave cluster for replication.
169    * @param id a short name that identifies the cluster
170    * @param peerConfig configuration for the replication slave cluster
171    * @param tableCfs the table and column-family list which will be replicated for this peer.
172    * A map from tableName to column family names. An empty collection can be passed
173    * to indicate replicating all column families. Pass null for replicating all table and column
174    * families
175    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
176    * use {@link #addPeer(String, ReplicationPeerConfig)} instead.
177    */
178   @Deprecated
179   public void addPeer(String id, ReplicationPeerConfig peerConfig,
180       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
181     if (tableCfs != null) {
182       peerConfig.setTableCFsMap(tableCfs);
183     }
184     this.replicationPeers.addPeer(id, peerConfig);
185   }
186 
187   /**
188    * Add a new remote slave cluster for replication.
189    * @param id a short name that identifies the cluster
190    * @param peerConfig configuration for the replication slave cluster
191    */
192   public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
193     this.replicationPeers.addPeer(id, peerConfig);
194   }
195 
196   /**
197    *  @deprecated as release of 2.0.0, and it will be removed in 3.0.0
198    * */
199   @Deprecated
200   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
201     return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
202   }
203 
204   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
205       throws ReplicationException {
206     this.replicationPeers.updatePeerConfig(id, peerConfig);
207   }
208   /**
209    * Removes a peer cluster and stops the replication to it.
210    * @param id a short name that identifies the cluster
211    */
212   public void removePeer(String id) throws ReplicationException {
213     this.replicationPeers.removePeer(id);
214   }
215 
216   /**
217    * Restart the replication stream to the specified peer.
218    * @param id a short name that identifies the cluster
219    */
220   public void enablePeer(String id) throws ReplicationException {
221     this.replicationPeers.enablePeer(id);
222   }
223 
224   /**
225    * Stop the replication stream to the specified peer.
226    * @param id a short name that identifies the cluster
227    */
228   public void disablePeer(String id) throws ReplicationException {
229     this.replicationPeers.disablePeer(id);
230   }
231 
232   /**
233    * Get the number of slave clusters the local cluster has.
234    * @return number of slave clusters
235    */
236   public int getPeersCount() {
237     return this.replicationPeers.getAllPeerIds().size();
238   }
239 
240   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
241     return this.replicationPeers.getAllPeerConfigs();
242   }
243 
244   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
245     return this.replicationPeers.getReplicationPeerConfig(id);
246   }
247 
248   /**
249    * Get the replicable table-cf config of the specified peer.
250    * @param id a short name that identifies the cluster
251    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
252    * use {@link #getPeerConfig(String)} instead.
253    * */
254   @Deprecated
255   public String getPeerTableCFs(String id) throws ReplicationException {
256     return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id));
257   }
258 
259   /**
260    * Append the replicable table-cf config of the specified peer
261    * @param id a short that identifies the cluster
262    * @param tableCfs table-cfs config str
263    * @throws ReplicationException
264    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
265    * use {@link #appendPeerTableCFs(String, Map)} instead.
266    */
267   @Deprecated
268   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
269     appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
270   }
271 
272   /**
273    * Append the replicable table-cf config of the specified peer
274    * @param id a short that identifies the cluster
275    * @param tableCfs A map from tableName to column family names
276    * @throws ReplicationException
277    */
278   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
279       throws ReplicationException {
280     if (tableCfs == null) {
281       throw new ReplicationException("tableCfs is null");
282     }
283     Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
284     if (preTableCfs == null) {
285       setPeerTableCFs(id, tableCfs);
286       return;
287     }
288     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
289       TableName table = entry.getKey();
290       Collection<String> appendCfs = entry.getValue();
291       if (preTableCfs.containsKey(table)) {
292         List<String> cfs = preTableCfs.get(table);
293         if (cfs == null || appendCfs == null) {
294           preTableCfs.put(table, null);
295         } else {
296           Set<String> cfSet = new HashSet<String>(cfs);
297           cfSet.addAll(appendCfs);
298           preTableCfs.put(table, Lists.newArrayList(cfSet));
299 
300         }
301       } else {
302         if (appendCfs == null || appendCfs.isEmpty()) {
303           preTableCfs.put(table, null);
304         } else {
305           preTableCfs.put(table, Lists.newArrayList(appendCfs));
306         }
307       }
308     }
309     setPeerTableCFs(id, preTableCfs);
310   }
311 
312   /**
313    * Remove some table-cfs from table-cfs config of the specified peer
314    * @param id a short name that identifies the cluster
315    * @param tableCf table-cfs config str
316    * @throws ReplicationException
317    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
318    * use {@link #removePeerTableCFs(String, Map)} instead.
319    */
320   @Deprecated
321   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
322     removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
323   }
324 
325   /**
326    * Remove some table-cfs from config of the specified peer
327    * @param id a short name that identifies the cluster
328    * @param tableCfs A map from tableName to column family names
329    * @throws ReplicationException
330    */
331   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
332       throws ReplicationException {
333     if (tableCfs == null) {
334       throw new ReplicationException("tableCfs is null");
335     }
336     Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
337     if (preTableCfs == null) {
338       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
339     }
340     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
341 
342       TableName table = entry.getKey();
343       Collection<String> removeCfs = entry.getValue();
344       if (preTableCfs.containsKey(table)) {
345         List<String> cfs = preTableCfs.get(table);
346         if (cfs == null && removeCfs == null) {
347           preTableCfs.remove(table);
348         } else if (cfs != null && removeCfs != null) {
349           Set<String> cfSet = new HashSet<String>(cfs);
350           cfSet.removeAll(removeCfs);
351           if (cfSet.isEmpty()) {
352             preTableCfs.remove(table);
353           } else {
354             preTableCfs.put(table, Lists.newArrayList(cfSet));
355           }
356         } else if (cfs == null && removeCfs != null) {
357           throw new ReplicationException("Cannot remove cf of table: " + table
358               + " which doesn't specify cfs from table-cfs config in peer: " + id);
359         } else if (cfs != null && removeCfs == null) {
360           throw new ReplicationException("Cannot remove table: " + table
361               + " which has specified cfs from table-cfs config in peer: " + id);
362         }
363       } else {
364         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
365 
366       }
367     }
368     setPeerTableCFs(id, preTableCfs);
369   }
370 
371   /**
372    * Set the replicable table-cf config of the specified peer
373    * @param id a short name that identifies the cluster
374    * @param tableCfs the table and column-family list which will be replicated for this peer.
375    * A map from tableName to column family names. An empty collection can be passed
376    * to indicate replicating all column families. Pass null for replicating all table and column
377    * families
378    */
379   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
380       throws ReplicationException {
381     this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
382   }
383 
384   /**
385    * Get the state of the specified peer cluster
386    * @param id String format of the Short name that identifies the peer,
387    * an IllegalArgumentException is thrown if it doesn't exist
388    * @return true if replication is enabled to that peer, false if it isn't
389    */
390   public boolean getPeerState(String id) throws ReplicationException {
391     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
392   }
393 
394   @Override
395   public void close() throws IOException {
396     if (this.zkw != null) {
397       this.zkw.close();
398     }
399     if (this.connection != null) {
400       this.connection.close();
401     }
402   }
403 
404 
405   /**
406    * Find all column families that are replicated from this cluster
407    * @return the full list of the replicated column families of this cluster as:
408    *        tableName, family name, replicationType
409    *
410    * Currently replicationType is Global. In the future, more replication
411    * types may be extended here. For example
412    *  1) the replication may only apply to selected peers instead of all peers
413    *  2) the replicationType may indicate the host Cluster servers as Slave
414    *     for the table:columnFam.
415    */
416   public List<HashMap<String, String>> listReplicated() throws IOException {
417     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
418 
419     Admin admin = connection.getAdmin();
420     HTableDescriptor[] tables;
421     try {
422       tables = admin.listTables();
423     } finally {
424       if (admin!= null) admin.close();
425     }
426 
427     for (HTableDescriptor table : tables) {
428       HColumnDescriptor[] columns = table.getColumnFamilies();
429       String tableName = table.getNameAsString();
430       for (HColumnDescriptor column : columns) {
431         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
432           // At this moment, the columfam is replicated to all peers
433           HashMap<String, String> replicationEntry = new HashMap<String, String>();
434           replicationEntry.put(TNAME, tableName);
435           replicationEntry.put(CFNAME, column.getNameAsString());
436           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
437           replicationColFams.add(replicationEntry);
438         }
439       }
440     }
441 
442     return replicationColFams;
443   }
444 
445   /**
446    * Enable a table's replication switch.
447    * @param tableName name of the table
448    * @throws IOException if a remote or network exception occurs
449    */
450   public void enableTableRep(final TableName tableName) throws IOException {
451     if (tableName == null) {
452       throw new IllegalArgumentException("Table name cannot be null");
453     }
454     try (Admin admin = this.connection.getAdmin()) {
455       if (!admin.tableExists(tableName)) {
456         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
457             + "' does not exists.");
458       }
459     }
460     byte[][] splits = getTableSplitRowKeys(tableName);
461     checkAndSyncTableDescToPeers(tableName, splits);
462     setTableRep(tableName, true);
463   }
464 
465   /**
466    * Disable a table's replication switch.
467    * @param tableName name of the table
468    * @throws IOException if a remote or network exception occurs
469    */
470   public void disableTableRep(final TableName tableName) throws IOException {
471     if (tableName == null) {
472       throw new IllegalArgumentException("Table name is null");
473     }
474     try (Admin admin = this.connection.getAdmin()) {
475       if (!admin.tableExists(tableName)) {
476         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
477             + "' does not exists.");
478       }
479     }
480     setTableRep(tableName, false);
481   }
482 
483   /**
484    * Get the split row keys of table
485    * @param tableName table name
486    * @return array of split row keys
487    * @throws IOException
488    */
489   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
490     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
491       byte[][] startKeys = locator.getStartKeys();
492       if (startKeys.length == 1) {
493         return null;
494       }
495       byte[][] splits = new byte[startKeys.length - 1][];
496       for (int i = 1; i < startKeys.length; i++) {
497         splits[i - 1] = startKeys[i];
498       }
499       return splits;
500     }
501   }
502 
503   /**
504    * Connect to peer and check the table descriptor on peer:
505    * <ol>
506    * <li>Create the same table on peer when not exist.</li>
507    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
508    * </ol>
509    * @param tableName name of the table to sync to the peer
510    * @param splits table split keys
511    * @throws IOException
512    */
513   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
514       throws IOException {
515     List<ReplicationPeer> repPeers = listReplicationPeers();
516     if (repPeers == null || repPeers.size() <= 0) {
517       throw new IllegalArgumentException("Found no peer cluster for replication.");
518     }
519     
520     final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
521     
522     for (ReplicationPeer repPeer : repPeers) {
523       Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
524       // TODO Currently peer TableCFs will not include namespace so we need to check only for table
525       // name without namespace in it. Need to correct this logic once we fix HBASE-11386.
526       if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
527         continue;
528       }
529 
530       Configuration peerConf = repPeer.getConfiguration();
531       HTableDescriptor htd = null;
532       try (Connection conn = ConnectionFactory.createConnection(peerConf);
533           Admin admin = this.connection.getAdmin();
534           Admin repHBaseAdmin = conn.getAdmin()) {
535         htd = admin.getTableDescriptor(tableName);
536         HTableDescriptor peerHtd = null;
537         if (!repHBaseAdmin.tableExists(tableName)) {
538           repHBaseAdmin.createTable(htd, splits);
539         } else {
540           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
541           if (peerHtd == null) {
542             throw new IllegalArgumentException("Failed to get table descriptor for table "
543                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
544           } else if (!peerHtd.equals(htd)) {
545             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
546                 + " exists in peer cluster " + repPeer.getId()
547                 + ", but the table descriptors are not same when comapred with source cluster."
548                 + " Thus can not enable the table's replication switch.");
549           }
550         }
551       }
552     }
553   }
554 
555   @VisibleForTesting
556   public void peerAdded(String id) throws ReplicationException {
557     this.replicationPeers.peerAdded(id);
558   }
559 
560   @VisibleForTesting
561   List<ReplicationPeer> listReplicationPeers() {
562     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
563     if (peers == null || peers.size() <= 0) {
564       return null;
565     }
566     List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
567     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
568       String peerId = peerEntry.getKey();
569       try {
570         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
571         Configuration peerConf = pair.getSecond();
572         ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
573           peerId, pair.getFirst(), this.connection);
574         listOfPeers.add(peer);
575       } catch (ReplicationException e) {
576         LOG.warn("Failed to get valid replication peers. "
577             + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
578             + e.getMessage());
579         LOG.debug("Failure details to get valid replication peers.", e);
580         continue;
581       }
582     }
583     return listOfPeers;
584   }
585 
586   /**
587    * Set the table's replication switch if the table's replication switch is already not set.
588    * @param tableName name of the table
589    * @param isRepEnabled is replication switch enable or disable
590    * @throws IOException if a remote or network exception occurs
591    */
592   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
593     Admin admin = null;
594     try {
595       admin = this.connection.getAdmin();
596       HTableDescriptor htd = admin.getTableDescriptor(tableName);
597       if (isTableRepEnabled(htd) ^ isRepEnabled) {
598         boolean isOnlineSchemaUpdateEnabled =
599             this.connection.getConfiguration()
600                 .getBoolean("hbase.online.schema.update.enable", true);
601         if (!isOnlineSchemaUpdateEnabled) {
602           admin.disableTable(tableName);
603         }
604         for (HColumnDescriptor hcd : htd.getFamilies()) {
605           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
606               : HConstants.REPLICATION_SCOPE_LOCAL);
607         }
608         admin.modifyTable(tableName, htd);
609         if (!isOnlineSchemaUpdateEnabled) {
610           admin.enableTable(tableName);
611         }
612       }
613     } finally {
614       if (admin != null) {
615         try {
616           admin.close();
617         } catch (IOException e) {
618           LOG.warn("Failed to close admin connection.");
619           LOG.debug("Details on failure to close admin connection.", e);
620         }
621       }
622     }
623   }
624 
625   /**
626    * @param htd table descriptor details for the table to check
627    * @return true if table's replication switch is enabled
628    */
629   private boolean isTableRepEnabled(HTableDescriptor htd) {
630     for (HColumnDescriptor hcd : htd.getFamilies()) {
631       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
632         return false;
633       }
634     }
635     return true;
636   }
637 }