View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Abortable;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.TableNotFoundException;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.classification.InterfaceStability;
46  import org.apache.hadoop.hbase.client.Admin;
47  import org.apache.hadoop.hbase.client.Connection;
48  import org.apache.hadoop.hbase.client.ConnectionFactory;
49  import org.apache.hadoop.hbase.client.RegionLocator;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationFactory;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer;
53  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
57  import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
60
61  /**
62   * <p>
63   * This class provides the administrative interface to HBase cluster
64   * replication. In order to use it, the cluster and the client using
65   * ReplicationAdmin must be configured with <code>hbase.replication</code>
66   * set to true.
67   * </p>
68   * <p>
69   * Adding a new peer results in creating new outbound connections from every
70   * region server to a subset of region servers on the slave cluster. Each
71   * new stream of replication will start replicating from the beginning of the
72   * current WAL, meaning that edits from that past will be replicated.
73   * </p>
74   * <p>
75   * Removing a peer is a destructive and irreversible operation that stops
76   * all the replication streams for the given cluster and deletes the metadata
77   * used to keep track of the replication state.
78   * </p>
79   * <p>
80   * To see which commands are available in the shell, type
81   * <code>replication</code>.
82   * </p>
83   */
84  @InterfaceAudience.Public
85  @InterfaceStability.Evolving
86  public class ReplicationAdmin implements Closeable {
87    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
88
89    public static final String TNAME = "tableName";
90    public static final String CFNAME = "columnFamilyName";
91
92    // only Global for now, can add other type
93    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
94    public static final String REPLICATIONTYPE = "replicationType";
95    public static final String REPLICATIONGLOBAL =
96        Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
97    public static final String REPLICATIONSERIAL =
98        Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
99
100   private final Connection connection;
101   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
102   // be moved to hbase-server. Resolve it in HBASE-11392.
103   private final ReplicationQueuesClient replicationQueuesClient;
104   private final ReplicationPeers replicationPeers;
105   /**
106    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
107    * on {@link #close()}.
108    */
109   private final ZooKeeperWatcher zkw;
110
111   /**
112    * Constructor that creates a connection to the local ZooKeeper ensemble.
113    * @param conf Configuration to use
114    * @throws IOException if an internal replication error occurs
115    * @throws RuntimeException if replication isn't enabled.
116    */
117   public ReplicationAdmin(Configuration conf) throws IOException {
118     this.connection = ConnectionFactory.createConnection(conf);
119     try {
120       zkw = createZooKeeperWatcher();
121       try {
122         this.replicationQueuesClient =
123             ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf,
124             this.connection, zkw));
125         this.replicationQueuesClient.init();
126         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
127           this.replicationQueuesClient, this.connection);
128         this.replicationPeers.init();
129       } catch (Exception exception) {
130         if (zkw != null) {
131           zkw.close();
132         }
133         throw exception;
134       }
135     } catch (Exception exception) {
136       if (connection != null) {
137         connection.close();
138       }
139       if (exception instanceof IOException) {
140         throw (IOException) exception;
141       } else if (exception instanceof RuntimeException) {
142         throw (RuntimeException) exception;
143       } else {
144         throw new IOException("Error initializing the replication admin client.", exception);
145       }
146     }
147   }
148
149   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
150     // This Abortable doesn't 'abort'... it just logs.
151     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
152       @Override
153       public void abort(String why, Throwable e) {
154         LOG.error(why, e);
155         // We used to call system.exit here but this script can be embedded by other programs that
156         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
157       }
158
159       @Override
160       public boolean isAborted() {
161         return false;
162       }
163     });
164   }
165
166   /**
167    * Add a new remote slave cluster for replication.
168    * @param id a short name that identifies the cluster
169    * @param peerConfig configuration for the replication slave cluster
170    * @param tableCfs the table and column-family list which will be replicated for this peer.
171    * A map from tableName to column family names. An empty collection can be passed
172    * to indicate replicating all column families. Pass null for replicating all table and column
173    * families
174    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
175    * use {@link #addPeer(String, ReplicationPeerConfig)} instead.
176    */
177   @Deprecated
178   public void addPeer(String id, ReplicationPeerConfig peerConfig,
179       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
180     if (tableCfs != null) {
181       peerConfig.setTableCFsMap(tableCfs);
182     }
183     this.replicationPeers.registerPeer(id, peerConfig);
184   }
185
186   /**
187    * Add a new remote slave cluster for replication.
188    * @param id a short name that identifies the cluster
189    * @param peerConfig configuration for the replication slave cluster
190    */
191   public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
192     this.replicationPeers.registerPeer(id, peerConfig);
193   }
194
195   /**
196    *  @deprecated as release of 2.0.0, and it will be removed in 3.0.0
197    * */
198   @Deprecated
199   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
200     return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
201   }
202
203   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
204       throws ReplicationException {
205     this.replicationPeers.updatePeerConfig(id, peerConfig);
206   }
207   /**
208    * Removes a peer cluster and stops the replication to it.
209    * @param id a short name that identifies the cluster
210    */
211   public void removePeer(String id) throws ReplicationException {
212     this.replicationPeers.unregisterPeer(id);
213   }
214
215   /**
216    * Restart the replication stream to the specified peer.
217    * @param id a short name that identifies the cluster
218    */
219   public void enablePeer(String id) throws ReplicationException {
220     this.replicationPeers.enablePeer(id);
221   }
222
223   /**
224    * Stop the replication stream to the specified peer.
225    * @param id a short name that identifies the cluster
226    */
227   public void disablePeer(String id) throws ReplicationException {
228     this.replicationPeers.disablePeer(id);
229   }
230
231   /**
232    * Get the number of slave clusters the local cluster has.
233    * @return number of slave clusters
234    */
235   public int getPeersCount() {
236     return this.replicationPeers.getAllPeerIds().size();
237   }
238
239   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
240     return this.replicationPeers.getAllPeerConfigs();
241   }
242
243   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
244     return this.replicationPeers.getReplicationPeerConfig(id);
245   }
246
247   /**
248    * Get the replicable table-cf config of the specified peer.
249    * @param id a short name that identifies the cluster
250    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
251    * use {@link #getPeerConfig(String)} instead.
252    * */
253   @Deprecated
254   public String getPeerTableCFs(String id) throws ReplicationException {
255     return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id));
256   }
257
258   /**
259    * Append the replicable table-cf config of the specified peer
260    * @param id a short that identifies the cluster
261    * @param tableCfs table-cfs config str
262    * @throws ReplicationException
263    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
264    * use {@link #appendPeerTableCFs(String, Map)} instead.
265    */
266   @Deprecated
267   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
268     appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
269   }
270
271   /**
272    * Append the replicable table-cf config of the specified peer
273    * @param id a short that identifies the cluster
274    * @param tableCfs A map from tableName to column family names
275    * @throws ReplicationException
276    */
277   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
278       throws ReplicationException {
279     if (tableCfs == null) {
280       throw new ReplicationException("tableCfs is null");
281     }
282     Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
283     if (preTableCfs == null) {
284       setPeerTableCFs(id, tableCfs);
285       return;
286     }
287     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
288       TableName table = entry.getKey();
289       Collection<String> appendCfs = entry.getValue();
290       if (preTableCfs.containsKey(table)) {
291         List<String> cfs = preTableCfs.get(table);
292         if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
293           preTableCfs.put(table, null);
294         } else {
295           Set<String> cfSet = new HashSet<String>(cfs);
296           cfSet.addAll(appendCfs);
297           preTableCfs.put(table, Lists.newArrayList(cfSet));
298         }
299       } else {
300         if (appendCfs == null || appendCfs.isEmpty()) {
301           preTableCfs.put(table, null);
302         } else {
303           preTableCfs.put(table, Lists.newArrayList(appendCfs));
304         }
305       }
306     }
307     setPeerTableCFs(id, preTableCfs);
308   }
309
310   /**
311    * Remove some table-cfs from table-cfs config of the specified peer
312    * @param id a short name that identifies the cluster
313    * @param tableCf table-cfs config str
314    * @throws ReplicationException
315    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
316    * use {@link #removePeerTableCFs(String, Map)} instead.
317    */
318   @Deprecated
319   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
320     removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
321   }
322
323   /**
324    * Remove some table-cfs from config of the specified peer
325    * @param id a short name that identifies the cluster
326    * @param tableCfs A map from tableName to column family names
327    * @throws ReplicationException
328    */
329   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
330       throws ReplicationException {
331     if (tableCfs == null) {
332       throw new ReplicationException("tableCfs is null");
333     }
334     Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
335     if (preTableCfs == null) {
336       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
337     }
338     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
339
340       TableName table = entry.getKey();
341       Collection<String> removeCfs = entry.getValue();
342       if (preTableCfs.containsKey(table)) {
343         List<String> cfs = preTableCfs.get(table);
344         if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
345           preTableCfs.remove(table);
346         } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
347           Set<String> cfSet = new HashSet<String>(cfs);
348           cfSet.removeAll(removeCfs);
349           if (cfSet.isEmpty()) {
350             preTableCfs.remove(table);
351           } else {
352             preTableCfs.put(table, Lists.newArrayList(cfSet));
353           }
354         } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
355           throw new ReplicationException("Cannot remove cf of table: " + table
356               + " which doesn't specify cfs from table-cfs config in peer: " + id);
357         } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
358           throw new ReplicationException("Cannot remove table: " + table
359               + " which has specified cfs from table-cfs config in peer: " + id);
360         }
361       } else {
362         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
363
364       }
365     }
366     setPeerTableCFs(id, preTableCfs);
367   }
368
369   /**
370    * Set the replicable table-cf config of the specified peer
371    * @param id a short name that identifies the cluster
372    * @param tableCfs the table and column-family list which will be replicated for this peer.
373    * A map from tableName to column family names. An empty collection can be passed
374    * to indicate replicating all column families. Pass null for replicating all table and column
375    * families
376    */
377   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
378       throws ReplicationException {
379     this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
380   }
381
382   /**
383    * Get the state of the specified peer cluster
384    * @param id String format of the Short name that identifies the peer,
385    * an IllegalArgumentException is thrown if it doesn't exist
386    * @return true if replication is enabled to that peer, false if it isn't
387    */
388   public boolean getPeerState(String id) throws ReplicationException {
389     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
390   }
391
392   @Override
393   public void close() throws IOException {
394     if (this.zkw != null) {
395       this.zkw.close();
396     }
397     if (this.connection != null) {
398       this.connection.close();
399     }
400   }
401
402
403   /**
404    * Find all column families that are replicated from this cluster
405    * @return the full list of the replicated column families of this cluster as:
406    *        tableName, family name, replicationType
407    *
408    * Currently replicationType is Global. In the future, more replication
409    * types may be extended here. For example
410    *  1) the replication may only apply to selected peers instead of all peers
411    *  2) the replicationType may indicate the host Cluster servers as Slave
412    *     for the table:columnFam.
413    */
414   public List<HashMap<String, String>> listReplicated() throws IOException {
415     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
416
417     Admin admin = connection.getAdmin();
418     HTableDescriptor[] tables;
419     try {
420       tables = admin.listTables();
421     } finally {
422       if (admin!= null) admin.close();
423     }
424
425     for (HTableDescriptor table : tables) {
426       HColumnDescriptor[] columns = table.getColumnFamilies();
427       String tableName = table.getNameAsString();
428       for (HColumnDescriptor column : columns) {
429         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
430           // At this moment, the columfam is replicated to all peers
431           HashMap<String, String> replicationEntry = new HashMap<String, String>();
432           replicationEntry.put(TNAME, tableName);
433           replicationEntry.put(CFNAME, column.getNameAsString());
434           replicationEntry.put(REPLICATIONTYPE,
435               column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
436                   REPLICATIONGLOBAL :
437                   REPLICATIONSERIAL);
438           replicationColFams.add(replicationEntry);
439         }
440       }
441     }
442
443     return replicationColFams;
444   }
445
446   /**
447    * Enable a table's replication switch.
448    * @param tableName name of the table
449    * @throws IOException if a remote or network exception occurs
450    */
451   public void enableTableRep(final TableName tableName) throws IOException {
452     if (tableName == null) {
453       throw new IllegalArgumentException("Table name cannot be null");
454     }
455     try (Admin admin = this.connection.getAdmin()) {
456       if (!admin.tableExists(tableName)) {
457         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
458             + "' does not exists.");
459       }
460     }
461     byte[][] splits = getTableSplitRowKeys(tableName);
462     checkAndSyncTableDescToPeers(tableName, splits);
463     setTableRep(tableName, true);
464   }
465
466   /**
467    * Disable a table's replication switch.
468    * @param tableName name of the table
469    * @throws IOException if a remote or network exception occurs
470    */
471   public void disableTableRep(final TableName tableName) throws IOException {
472     if (tableName == null) {
473       throw new IllegalArgumentException("Table name is null");
474     }
475     try (Admin admin = this.connection.getAdmin()) {
476       if (!admin.tableExists(tableName)) {
477         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
478             + "' does not exists.");
479       }
480     }
481     setTableRep(tableName, false);
482   }
483
484   /**
485    * Get the split row keys of table
486    * @param tableName table name
487    * @return array of split row keys
488    * @throws IOException
489    */
490   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
491     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
492       byte[][] startKeys = locator.getStartKeys();
493       if (startKeys.length == 1) {
494         return null;
495       }
496       byte[][] splits = new byte[startKeys.length - 1][];
497       for (int i = 1; i < startKeys.length; i++) {
498         splits[i - 1] = startKeys[i];
499       }
500       return splits;
501     }
502   }
503
504   /**
505    * Connect to peer and check the table descriptor on peer:
506    * <ol>
507    * <li>Create the same table on peer when not exist.</li>
508    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
509    * </ol>
510    * @param tableName name of the table to sync to the peer
511    * @param splits table split keys
512    * @throws IOException
513    */
514   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
515       throws IOException {
516     List<ReplicationPeer> repPeers = listReplicationPeers();
517     if (repPeers == null || repPeers.size() <= 0) {
518       throw new IllegalArgumentException("Found no peer cluster for replication.");
519     }
520
521     final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
522
523     for (ReplicationPeer repPeer : repPeers) {
524       Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
525       // TODO Currently peer TableCFs will not include namespace so we need to check only for table
526       // name without namespace in it. Need to correct this logic once we fix HBASE-11386.
527       if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
528         continue;
529       }
530
531       Configuration peerConf = repPeer.getConfiguration();
532       HTableDescriptor htd = null;
533       try (Connection conn = ConnectionFactory.createConnection(peerConf);
534           Admin admin = this.connection.getAdmin();
535           Admin repHBaseAdmin = conn.getAdmin()) {
536         htd = admin.getTableDescriptor(tableName);
537         HTableDescriptor peerHtd = null;
538         if (!repHBaseAdmin.tableExists(tableName)) {
539           repHBaseAdmin.createTable(htd, splits);
540         } else {
541           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
542           if (peerHtd == null) {
543             throw new IllegalArgumentException("Failed to get table descriptor for table "
544                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
545           } else if (!peerHtd.equals(htd)) {
546             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
547                 + " exists in peer cluster " + repPeer.getId()
548                 + ", but the table descriptors are not same when comapred with source cluster."
549                 + " Thus can not enable the table's replication switch.");
550           }
551         }
552       }
553     }
554   }
555
556   @VisibleForTesting
557   public void peerAdded(String id) throws ReplicationException {
558     this.replicationPeers.peerConnected(id);
559   }
560
561   @VisibleForTesting
562   List<ReplicationPeer> listReplicationPeers() {
563     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
564     if (peers == null || peers.size() <= 0) {
565       return null;
566     }
567     List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
568     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
569       String peerId = peerEntry.getKey();
570       try {
571         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
572         Configuration peerConf = pair.getSecond();
573         ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
574           peerId, pair.getFirst(), this.connection);
575         listOfPeers.add(peer);
576       } catch (ReplicationException e) {
577         LOG.warn("Failed to get valid replication peers. "
578             + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
579             + e.getMessage());
580         LOG.debug("Failure details to get valid replication peers.", e);
581         continue;
582       }
583     }
584     return listOfPeers;
585   }
586
587   /**
588    * Set the table's replication switch if the table's replication switch is already not set.
589    * @param tableName name of the table
590    * @param isRepEnabled is replication switch enable or disable
591    * @throws IOException if a remote or network exception occurs
592    */
593   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
594     Admin admin = null;
595     try {
596       admin = this.connection.getAdmin();
597       HTableDescriptor htd = admin.getTableDescriptor(tableName);
598       if (isTableRepEnabled(htd) ^ isRepEnabled) {
599         for (HColumnDescriptor hcd : htd.getFamilies()) {
600           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
601               : HConstants.REPLICATION_SCOPE_LOCAL);
602         }
603         admin.modifyTable(tableName, htd);
604       }
605     } finally {
606       if (admin != null) {
607         try {
608           admin.close();
609         } catch (IOException e) {
610           LOG.warn("Failed to close admin connection.");
611           LOG.debug("Details on failure to close admin connection.", e);
612         }
613       }
614     }
615   }
616
617   /**
618    * @param htd table descriptor details for the table to check
619    * @return true if table's replication switch is enabled
620    */
621   private boolean isTableRepEnabled(HTableDescriptor htd) {
622     for (HColumnDescriptor hcd : htd.getFamilies()) {
623       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
624           && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
625         return false;
626       }
627     }
628     return true;
629   }
630 }