View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.classification.InterfaceStability;
43  import org.apache.hadoop.hbase.client.HConnection;
44  import org.apache.hadoop.hbase.client.HConnectionManager;
45  import org.apache.hadoop.hbase.replication.ReplicationException;
46  import org.apache.hadoop.hbase.replication.ReplicationFactory;
47  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
48  import org.apache.hadoop.hbase.replication.ReplicationPeers;
49  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
50  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
51  import org.apache.zookeeper.KeeperException;
52  
53  import com.google.common.annotations.VisibleForTesting;
54  import com.google.common.collect.Lists;
55  
56  /**
57   * <p>
58   * This class provides the administrative interface to HBase cluster
59   * replication. In order to use it, the cluster and the client using
60   * ReplicationAdmin must be configured with <code>hbase.replication</code>
61   * set to true.
62   * </p>
63   * <p>
64   * Adding a new peer results in creating new outbound connections from every
65   * region server to a subset of region servers on the slave cluster. Each
66   * new stream of replication will start replicating from the beginning of the
67   * current WAL, meaning that edits from that past will be replicated.
68   * </p>
69   * <p>
70   * Removing a peer is a destructive and irreversible operation that stops
71   * all the replication streams for the given cluster and deletes the metadata
72   * used to keep track of the replication state.
73   * </p>
74   * <p>
75   * To see which commands are available in the shell, type
76   * <code>replication</code>.
77   * </p>
78   */
79  @InterfaceAudience.Public
80  @InterfaceStability.Evolving
81  public class ReplicationAdmin implements Closeable {
82    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
83  
84    public static final String TNAME = "tableName";
85    public static final String CFNAME = "columnFamlyName";
86  
87    // only Global for now, can add other type
88    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
89    public static final String REPLICATIONTYPE = "replicationType";
90    public static final String REPLICATIONGLOBAL = Integer
91        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
92  
93    private final HConnection connection;
94    // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
95    // be moved to hbase-server. Resolve it in HBASE-11392.
96    private final ReplicationQueuesClient replicationQueuesClient;
97    private final ReplicationPeers replicationPeers;
98  
99    /**
100    * Constructor that creates a connection to the local ZooKeeper ensemble.
101    * @param conf Configuration to use
102    * @throws IOException if an internal replication error occurs
103    * @throws RuntimeException if replication isn't enabled.
104    */
105   public ReplicationAdmin(Configuration conf) throws IOException {
106     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
107         HConstants.REPLICATION_ENABLE_DEFAULT)) {
108       throw new RuntimeException("hbase.replication isn't true, please " +
109           "enable it in order to use replication");
110     }
111     this.connection = HConnectionManager.getConnection(conf);
112     ZooKeeperWatcher zkw = createZooKeeperWatcher();
113     try {
114       this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
115       this.replicationPeers.init();
116       this.replicationQueuesClient =
117           ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
118       this.replicationQueuesClient.init();
119 
120     } catch (ReplicationException e) {
121       throw new IOException("Error initializing the replication admin client.", e);
122     }
123   }
124 
125   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
126     return new ZooKeeperWatcher(connection.getConfiguration(),
127       "Replication Admin", new Abortable() {
128       @Override
129       public void abort(String why, Throwable e) {
130         LOG.error(why, e);
131         System.exit(1);
132       }
133 
134       @Override
135       public boolean isAborted() {
136         return false;
137       }
138 
139     });
140   }
141 
142   /**
143    * Add a new peer cluster to replicate to.
144    * @param id a short name that identifies the cluster
145    * @param clusterKey the concatenation of the slave cluster's
146    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
147    * @throws IllegalStateException if there's already one slave since
148    * multi-slave isn't supported yet.
149    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
150    */
151   @Deprecated
152   public void addPeer(String id, String clusterKey) throws ReplicationException {
153     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
154   }
155 
156   @Deprecated
157   public void addPeer(String id, String clusterKey, String tableCFs)
158     throws ReplicationException {
159     this.replicationPeers.addPeer(id,
160       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
161   }
162 
163   /**
164    * Add a new remote slave cluster for replication.
165    * @param id a short name that identifies the cluster
166    * @param peerConfig configuration for the replication slave cluster
167    * @param tableCfs the table and column-family list which will be replicated for this peer.
168    * A map from tableName to column family names. An empty collection can be passed
169    * to indicate replicating all column families. Pass null for replicating all table and column
170    * families
171    */
172   public void addPeer(String id, ReplicationPeerConfig peerConfig,
173       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
174     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
175   }
176 
177   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
178     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
179       return null;
180     }
181 
182     Map<TableName, List<String>> tableCFsMap = null;
183     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
184     // parse out (table, cf-list) pairs from tableCFsConfig
185     // format: "table1:cf1,cf2;table2:cfA,cfB"
186     String[] tables = tableCFsConfig.split(";");
187     for (String tab : tables) {
188       // 1 ignore empty table config
189       tab = tab.trim();
190       if (tab.length() == 0) {
191         continue;
192       }
193       // 2 split to "table" and "cf1,cf2"
194       //   for each table: "table:cf1,cf2" or "table"
195       String[] pair = tab.split(":");
196       String tabName = pair[0].trim();
197       if (pair.length > 2 || tabName.length() == 0) {
198         LOG.error("ignore invalid tableCFs setting: " + tab);
199         continue;
200       }
201 
202       // 3 parse "cf1,cf2" part to List<cf>
203       List<String> cfs = null;
204       if (pair.length == 2) {
205         String[] cfsList = pair[1].split(",");
206         for (String cf : cfsList) {
207           String cfName = cf.trim();
208           if (cfName.length() > 0) {
209             if (cfs == null) {
210               cfs = new ArrayList<String>();
211             }
212             cfs.add(cfName);
213           }
214         }
215       }
216 
217       // 4 put <table, List<cf>> to map
218       if (tableCFsMap == null) {
219         tableCFsMap = new HashMap<TableName, List<String>>();
220       }
221       tableCFsMap.put(TableName.valueOf(tabName), cfs);
222     }
223     return tableCFsMap;
224   }
225 
226   @VisibleForTesting
227   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
228     String tableCfsStr = null;
229     if (tableCfs != null) {
230       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
231       StringBuilder builder = new StringBuilder();
232       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
233         if (builder.length() > 0) {
234           builder.append(";");
235         }
236         builder.append(entry.getKey());
237         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
238           builder.append(":");
239           builder.append(StringUtils.join(entry.getValue(), ","));
240         }
241       }
242       tableCfsStr = builder.toString();
243     }
244     return tableCfsStr;
245   }
246 
247   /**
248    * Removes a peer cluster and stops the replication to it.
249    * @param id a short name that identifies the cluster
250    */
251   public void removePeer(String id) throws ReplicationException {
252     this.replicationPeers.removePeer(id);
253   }
254 
255   /**
256    * Restart the replication stream to the specified peer.
257    * @param id a short name that identifies the cluster
258    */
259   public void enablePeer(String id) throws ReplicationException {
260     this.replicationPeers.enablePeer(id);
261   }
262 
263   /**
264    * Stop the replication stream to the specified peer.
265    * @param id a short name that identifies the cluster
266    */
267   public void disablePeer(String id) throws ReplicationException {
268     this.replicationPeers.disablePeer(id);
269   }
270 
271   /**
272    * Get the number of slave clusters the local cluster has.
273    * @return number of slave clusters
274    */
275   public int getPeersCount() {
276     return this.replicationPeers.getAllPeerIds().size();
277   }
278 
279   /**
280    * Map of this cluster's peers for display.
281    * @return A map of peer ids to peer cluster keys
282    * @deprecated use {@link #listPeerConfigs()}
283    */
284   @Deprecated
285   public Map<String, String> listPeers() {
286     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
287     Map<String, String> ret = new HashMap<String, String>(peers.size());
288 
289     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
290       ret.put(entry.getKey(), entry.getValue().getClusterKey());
291     }
292     return ret;
293   }
294 
295   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
296     return this.replicationPeers.getAllPeerConfigs();
297   }
298 
299   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
300     return this.replicationPeers.getReplicationPeerConfig(id);
301   }
302 
303   /**
304    * Get the replicable table-cf config of the specified peer.
305    * @param id a short name that identifies the cluster
306    */
307   public String getPeerTableCFs(String id) throws ReplicationException {
308     return this.replicationPeers.getPeerTableCFsConfig(id);
309   }
310 
311   /**
312    * Set the replicable table-cf config of the specified peer
313    * @param id a short name that identifies the cluster
314    * @deprecated use {@link #setPeerTableCFs(String, Map)}
315    */
316   @Deprecated
317   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
318     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
319   }
320 
321   /**
322    * Append the replicable table-cf config of the specified peer
323    * @param id a short that identifies the cluster
324    * @param tableCfs table-cfs config str
325    * @throws KeeperException
326    */
327   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
328     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
329   }
330 
331   /**
332    * Append the replicable table-cf config of the specified peer
333    * @param id a short that identifies the cluster
334    * @param tableCfs A map from tableName to column family names
335    * @throws KeeperException
336    */
337   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
338       throws ReplicationException {
339     if (tableCfs == null) {
340       throw new ReplicationException("tableCfs is null");
341     }
342     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
343     if (preTableCfs == null) {
344       setPeerTableCFs(id, tableCfs);
345       return;
346     }
347 
348     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
349       TableName table = entry.getKey();
350       Collection<String> appendCfs = entry.getValue();
351       if (preTableCfs.containsKey(table)) {
352         List<String> cfs = preTableCfs.get(table);
353         if (cfs == null || appendCfs == null) {
354           preTableCfs.put(table, null);
355         } else {
356           Set<String> cfSet = new HashSet<String>(cfs);
357           cfSet.addAll(appendCfs);
358           preTableCfs.put(table, Lists.newArrayList(cfSet));
359         }
360       } else {
361         if (appendCfs == null || appendCfs.isEmpty()) {
362           preTableCfs.put(table, null);
363         } else {
364           preTableCfs.put(table, Lists.newArrayList(appendCfs));
365         }
366       }
367     }
368     setPeerTableCFs(id, preTableCfs);
369   }
370 
371   /**
372    * Remove some table-cfs from table-cfs config of the specified peer
373    * @param id a short name that identifies the cluster
374    * @param tableCf table-cfs config str
375    * @throws ReplicationException
376    */
377   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
378     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
379   }
380 
381   /**
382    * Remove some table-cfs from config of the specified peer
383    * @param id a short name that identifies the cluster
384    * @param tableCfs A map from tableName to column family names
385    * @throws ReplicationException
386    */
387   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
388       throws ReplicationException {
389     if (tableCfs == null) {
390       throw new ReplicationException("tableCfs is null");
391     }
392 
393     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
394     if (preTableCfs == null) {
395       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
396     }
397     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
398       TableName table = entry.getKey();
399       Collection<String> removeCfs = entry.getValue();
400       if (preTableCfs.containsKey(table)) {
401         List<String> cfs = preTableCfs.get(table);
402         if (cfs == null && removeCfs == null) {
403           preTableCfs.remove(table);
404         } else if (cfs != null && removeCfs != null) {
405           Set<String> cfSet = new HashSet<String>(cfs);
406           cfSet.removeAll(removeCfs);
407           if (cfSet.isEmpty()) {
408             preTableCfs.remove(table);
409           } else {
410             preTableCfs.put(table, Lists.newArrayList(cfSet));
411           }
412         } else if (cfs == null && removeCfs != null) {
413           throw new ReplicationException("Cannot remove cf of table: " + table
414               + " which doesn't specify cfs from table-cfs config in peer: " + id);
415         } else if (cfs != null && removeCfs == null) {
416           throw new ReplicationException("Cannot remove table: " + table
417               + " which has specified cfs from table-cfs config in peer: " + id);
418         }
419       } else {
420         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
421       }
422     }
423     setPeerTableCFs(id, preTableCfs);
424   }
425 
426   /**
427    * Set the replicable table-cf config of the specified peer
428    * @param id a short name that identifies the cluster
429    * @param tableCfs the table and column-family list which will be replicated for this peer.
430    * A map from tableName to column family names. An empty collection can be passed
431    * to indicate replicating all column families. Pass null for replicating all table and column
432    * families
433    */
434   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
435       throws ReplicationException {
436     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
437   }
438 
439   /**
440    * Get the state of the specified peer cluster
441    * @param id String format of the Short name that identifies the peer,
442    * an IllegalArgumentException is thrown if it doesn't exist
443    * @return true if replication is enabled to that peer, false if it isn't
444    */
445   public boolean getPeerState(String id) throws ReplicationException {
446     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
447   }
448 
449   @Override
450   public void close() throws IOException {
451     if (this.connection != null) {
452       this.connection.close();
453     }
454   }
455 
456 
457   /**
458    * Find all column families that are replicated from this cluster
459    * @return the full list of the replicated column families of this cluster as:
460    *        tableName, family name, replicationType
461    *
462    * Currently replicationType is Global. In the future, more replication
463    * types may be extended here. For example
464    *  1) the replication may only apply to selected peers instead of all peers
465    *  2) the replicationType may indicate the host Cluster servers as Slave
466    *     for the table:columnFam.
467    */
468   public List<HashMap<String, String>> listReplicated() throws IOException {
469     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
470     HTableDescriptor[] tables = this.connection.listTables();
471 
472     for (HTableDescriptor table : tables) {
473       HColumnDescriptor[] columns = table.getColumnFamilies();
474       String tableName = table.getNameAsString();
475       for (HColumnDescriptor column : columns) {
476         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
477           // At this moment, the columfam is replicated to all peers
478           HashMap<String, String> replicationEntry = new HashMap<String, String>();
479           replicationEntry.put(TNAME, tableName);
480           replicationEntry.put(CFNAME, column.getNameAsString());
481           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
482           replicationColFams.add(replicationEntry);
483         }
484       }
485     }
486 
487     return replicationColFams;
488   }
489 }