View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.classification.InterfaceStability;
43  import org.apache.hadoop.hbase.client.Admin;
44  import org.apache.hadoop.hbase.client.HBaseAdmin;
45  import org.apache.hadoop.hbase.client.HConnection;
46  import org.apache.hadoop.hbase.client.HConnectionManager;
47  import org.apache.hadoop.hbase.replication.ReplicationException;
48  import org.apache.hadoop.hbase.replication.ReplicationFactory;
49  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
50  import org.apache.hadoop.hbase.replication.ReplicationPeers;
51  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
52  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
53  
54  import com.google.common.annotations.VisibleForTesting;
55  import com.google.common.collect.Lists;
56  
57  /**
58   * <p>
59   * This class provides the administrative interface to HBase cluster
60   * replication. In order to use it, the cluster and the client using
61   * ReplicationAdmin must be configured with <code>hbase.replication</code>
62   * set to true.
63   * </p>
64   * <p>
65   * Adding a new peer results in creating new outbound connections from every
66   * region server to a subset of region servers on the slave cluster. Each
67   * new stream of replication will start replicating from the beginning of the
68   * current WAL, meaning that edits from that past will be replicated.
69   * </p>
70   * <p>
71   * Removing a peer is a destructive and irreversible operation that stops
72   * all the replication streams for the given cluster and deletes the metadata
73   * used to keep track of the replication state.
74   * </p>
75   * <p>
76   * To see which commands are available in the shell, type
77   * <code>replication</code>.
78   * </p>
79   */
80  @InterfaceAudience.Public
81  @InterfaceStability.Evolving
82  public class ReplicationAdmin implements Closeable {
83    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
84  
85    public static final String TNAME = "tableName";
86    public static final String CFNAME = "columnFamlyName";
87  
88    // only Global for now, can add other type
89    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
90    public static final String REPLICATIONTYPE = "replicationType";
91    public static final String REPLICATIONGLOBAL = Integer
92        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
93  
94    private final HConnection connection;
95    // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
96    // be moved to hbase-server. Resolve it in HBASE-11392.
97    private final ReplicationQueuesClient replicationQueuesClient;
98    private final ReplicationPeers replicationPeers;
99  
100   /**
101    * Constructor that creates a connection to the local ZooKeeper ensemble.
102    * @param conf Configuration to use
103    * @throws IOException if an internal replication error occurs
104    * @throws RuntimeException if replication isn't enabled.
105    */
106   public ReplicationAdmin(Configuration conf) throws IOException {
107     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
108         HConstants.REPLICATION_ENABLE_DEFAULT)) {
109       throw new RuntimeException("hbase.replication isn't true, please " +
110           "enable it in order to use replication");
111     }
112     this.connection = HConnectionManager.getConnection(conf);
113     ZooKeeperWatcher zkw = createZooKeeperWatcher();
114     try {
115       this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
116       this.replicationPeers.init();
117       this.replicationQueuesClient =
118           ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
119       this.replicationQueuesClient.init();
120 
121     } catch (ReplicationException e) {
122       throw new IOException("Error initializing the replication admin client.", e);
123     }
124   }
125 
126   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
127     return new ZooKeeperWatcher(connection.getConfiguration(),
128       "Replication Admin", new Abortable() {
129       @Override
130       public void abort(String why, Throwable e) {
131         LOG.error(why, e);
132         System.exit(1);
133       }
134 
135       @Override
136       public boolean isAborted() {
137         return false;
138       }
139 
140     });
141   }
142 
143   /**
144    * Add a new peer cluster to replicate to.
145    * @param id a short name that identifies the cluster
146    * @param clusterKey the concatenation of the slave cluster's
147    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
148    * @throws IllegalStateException if there's already one slave since
149    * multi-slave isn't supported yet.
150    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
151    */
152   @Deprecated
153   public void addPeer(String id, String clusterKey) throws ReplicationException {
154     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
155   }
156 
157   @Deprecated
158   public void addPeer(String id, String clusterKey, String tableCFs)
159     throws ReplicationException {
160     this.replicationPeers.addPeer(id,
161       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
162   }
163 
164   /**
165    * Add a new remote slave cluster for replication.
166    * @param id a short name that identifies the cluster
167    * @param peerConfig configuration for the replication slave cluster
168    * @param tableCfs the table and column-family list which will be replicated for this peer.
169    * A map from tableName to column family names. An empty collection can be passed
170    * to indicate replicating all column families. Pass null for replicating all table and column
171    * families
172    */
173   public void addPeer(String id, ReplicationPeerConfig peerConfig,
174       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
175     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
176   }
177 
178   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
179     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
180       return null;
181     }
182 
183     Map<TableName, List<String>> tableCFsMap = null;
184     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
185     // parse out (table, cf-list) pairs from tableCFsConfig
186     // format: "table1:cf1,cf2;table2:cfA,cfB"
187     String[] tables = tableCFsConfig.split(";");
188     for (String tab : tables) {
189       // 1 ignore empty table config
190       tab = tab.trim();
191       if (tab.length() == 0) {
192         continue;
193       }
194       // 2 split to "table" and "cf1,cf2"
195       //   for each table: "table:cf1,cf2" or "table"
196       String[] pair = tab.split(":");
197       String tabName = pair[0].trim();
198       if (pair.length > 2 || tabName.length() == 0) {
199         LOG.error("ignore invalid tableCFs setting: " + tab);
200         continue;
201       }
202 
203       // 3 parse "cf1,cf2" part to List<cf>
204       List<String> cfs = null;
205       if (pair.length == 2) {
206         String[] cfsList = pair[1].split(",");
207         for (String cf : cfsList) {
208           String cfName = cf.trim();
209           if (cfName.length() > 0) {
210             if (cfs == null) {
211               cfs = new ArrayList<String>();
212             }
213             cfs.add(cfName);
214           }
215         }
216       }
217 
218       // 4 put <table, List<cf>> to map
219       if (tableCFsMap == null) {
220         tableCFsMap = new HashMap<TableName, List<String>>();
221       }
222       tableCFsMap.put(TableName.valueOf(tabName), cfs);
223     }
224     return tableCFsMap;
225   }
226 
227   @VisibleForTesting
228   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
229     String tableCfsStr = null;
230     if (tableCfs != null) {
231       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
232       StringBuilder builder = new StringBuilder();
233       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
234         if (builder.length() > 0) {
235           builder.append(";");
236         }
237         builder.append(entry.getKey());
238         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
239           builder.append(":");
240           builder.append(StringUtils.join(entry.getValue(), ","));
241         }
242       }
243       tableCfsStr = builder.toString();
244     }
245     return tableCfsStr;
246   }
247 
248   /**
249    * Removes a peer cluster and stops the replication to it.
250    * @param id a short name that identifies the cluster
251    */
252   public void removePeer(String id) throws ReplicationException {
253     this.replicationPeers.removePeer(id);
254   }
255 
256   /**
257    * Restart the replication stream to the specified peer.
258    * @param id a short name that identifies the cluster
259    */
260   public void enablePeer(String id) throws ReplicationException {
261     this.replicationPeers.enablePeer(id);
262   }
263 
264   /**
265    * Stop the replication stream to the specified peer.
266    * @param id a short name that identifies the cluster
267    */
268   public void disablePeer(String id) throws ReplicationException {
269     this.replicationPeers.disablePeer(id);
270   }
271 
272   /**
273    * Get the number of slave clusters the local cluster has.
274    * @return number of slave clusters
275    */
276   public int getPeersCount() {
277     return this.replicationPeers.getAllPeerIds().size();
278   }
279 
280   /**
281    * Map of this cluster's peers for display.
282    * @return A map of peer ids to peer cluster keys
283    * @deprecated use {@link #listPeerConfigs()}
284    */
285   @Deprecated
286   public Map<String, String> listPeers() {
287     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
288     Map<String, String> ret = new HashMap<String, String>(peers.size());
289 
290     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
291       ret.put(entry.getKey(), entry.getValue().getClusterKey());
292     }
293     return ret;
294   }
295 
296   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
297     return this.replicationPeers.getAllPeerConfigs();
298   }
299 
300   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
301     return this.replicationPeers.getReplicationPeerConfig(id);
302   }
303 
304   /**
305    * Get the replicable table-cf config of the specified peer.
306    * @param id a short name that identifies the cluster
307    */
308   public String getPeerTableCFs(String id) throws ReplicationException {
309     return this.replicationPeers.getPeerTableCFsConfig(id);
310   }
311 
312   /**
313    * Set the replicable table-cf config of the specified peer
314    * @param id a short name that identifies the cluster
315    * @deprecated use {@link #setPeerTableCFs(String, Map)}
316    */
317   @Deprecated
318   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
319     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
320   }
321 
322   /**
323    * Append the replicable table-cf config of the specified peer
324    * @param id a short that identifies the cluster
325    * @param tableCfs table-cfs config str
326    * @throws KeeperException
327    */
328   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
329     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
330   }
331 
332   /**
333    * Append the replicable table-cf config of the specified peer
334    * @param id a short that identifies the cluster
335    * @param tableCfs A map from tableName to column family names
336    * @throws KeeperException
337    */
338   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
339       throws ReplicationException {
340     if (tableCfs == null) {
341       throw new ReplicationException("tableCfs is null");
342     }
343     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
344     if (preTableCfs == null) {
345       setPeerTableCFs(id, tableCfs);
346       return;
347     }
348 
349     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
350       TableName table = entry.getKey();
351       Collection<String> appendCfs = entry.getValue();
352       if (preTableCfs.containsKey(table)) {
353         List<String> cfs = preTableCfs.get(table);
354         if (cfs == null || appendCfs == null) {
355           preTableCfs.put(table, null);
356         } else {
357           Set<String> cfSet = new HashSet<String>(cfs);
358           cfSet.addAll(appendCfs);
359           preTableCfs.put(table, Lists.newArrayList(cfSet));
360         }
361       } else {
362         if (appendCfs == null || appendCfs.isEmpty()) {
363           preTableCfs.put(table, null);
364         } else {
365           preTableCfs.put(table, Lists.newArrayList(appendCfs));
366         }
367       }
368     }
369     setPeerTableCFs(id, preTableCfs);
370   }
371 
372   /**
373    * Remove some table-cfs from table-cfs config of the specified peer
374    * @param id a short name that identifies the cluster
375    * @param tableCf table-cfs config str
376    * @throws ReplicationException
377    */
378   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
379     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
380   }
381 
382   /**
383    * Remove some table-cfs from config of the specified peer
384    * @param id a short name that identifies the cluster
385    * @param tableCfs A map from tableName to column family names
386    * @throws ReplicationException
387    */
388   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
389       throws ReplicationException {
390     if (tableCfs == null) {
391       throw new ReplicationException("tableCfs is null");
392     }
393 
394     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
395     if (preTableCfs == null) {
396       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
397     }
398     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
399       TableName table = entry.getKey();
400       Collection<String> removeCfs = entry.getValue();
401       if (preTableCfs.containsKey(table)) {
402         List<String> cfs = preTableCfs.get(table);
403         if (cfs == null && removeCfs == null) {
404           preTableCfs.remove(table);
405         } else if (cfs != null && removeCfs != null) {
406           Set<String> cfSet = new HashSet<String>(cfs);
407           cfSet.removeAll(removeCfs);
408           if (cfSet.isEmpty()) {
409             preTableCfs.remove(table);
410           } else {
411             preTableCfs.put(table, Lists.newArrayList(cfSet));
412           }
413         } else if (cfs == null && removeCfs != null) {
414           throw new ReplicationException("Cannot remove cf of table: " + table
415               + " which doesn't specify cfs from table-cfs config in peer: " + id);
416         } else if (cfs != null && removeCfs == null) {
417           throw new ReplicationException("Cannot remove table: " + table
418               + " which has specified cfs from table-cfs config in peer: " + id);
419         }
420       } else {
421         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
422       }
423     }
424     setPeerTableCFs(id, preTableCfs);
425   }
426 
427   /**
428    * Set the replicable table-cf config of the specified peer
429    * @param id a short name that identifies the cluster
430    * @param tableCfs the table and column-family list which will be replicated for this peer.
431    * A map from tableName to column family names. An empty collection can be passed
432    * to indicate replicating all column families. Pass null for replicating all table and column
433    * families
434    */
435   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
436       throws ReplicationException {
437     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
438   }
439 
440   /**
441    * Get the state of the specified peer cluster
442    * @param id String format of the Short name that identifies the peer,
443    * an IllegalArgumentException is thrown if it doesn't exist
444    * @return true if replication is enabled to that peer, false if it isn't
445    */
446   public boolean getPeerState(String id) throws ReplicationException {
447     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
448   }
449 
450   @Override
451   public void close() throws IOException {
452     if (this.connection != null) {
453       this.connection.close();
454     }
455   }
456 
457 
458   /**
459    * Find all column families that are replicated from this cluster
460    * @return the full list of the replicated column families of this cluster as:
461    *        tableName, family name, replicationType
462    *
463    * Currently replicationType is Global. In the future, more replication
464    * types may be extended here. For example
465    *  1) the replication may only apply to selected peers instead of all peers
466    *  2) the replicationType may indicate the host Cluster servers as Slave
467    *     for the table:columnFam.
468    */
469   public List<HashMap<String, String>> listReplicated() throws IOException {
470     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
471 
472     Admin admin = new HBaseAdmin(this.connection.getConfiguration());
473     HTableDescriptor[] tables;
474     try {
475       tables = admin.listTables();
476     } finally {
477       if (admin!= null) admin.close();
478     }
479 
480     for (HTableDescriptor table : tables) {
481       HColumnDescriptor[] columns = table.getColumnFamilies();
482       String tableName = table.getNameAsString();
483       for (HColumnDescriptor column : columns) {
484         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
485           // At this moment, the columfam is replicated to all peers
486           HashMap<String, String> replicationEntry = new HashMap<String, String>();
487           replicationEntry.put(TNAME, tableName);
488           replicationEntry.put(CFNAME, column.getNameAsString());
489           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
490           replicationColFams.add(replicationEntry);
491         }
492       }
493     }
494 
495     return replicationColFams;
496   }
497 }