View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Map.Entry;
30  import java.util.Set;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.classification.InterfaceStability;
43  import org.apache.hadoop.hbase.client.Admin;
44  import org.apache.hadoop.hbase.client.Connection;
45  import org.apache.hadoop.hbase.client.ConnectionFactory;
46  import org.apache.hadoop.hbase.replication.ReplicationException;
47  import org.apache.hadoop.hbase.replication.ReplicationFactory;
48  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
49  import org.apache.hadoop.hbase.replication.ReplicationPeers;
50  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
51  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
52  
53  import com.google.common.annotations.VisibleForTesting;
54  import com.google.common.collect.Lists;
55  
56  /**
57   * <p>
58   * This class provides the administrative interface to HBase cluster
59   * replication. In order to use it, the cluster and the client using
60   * ReplicationAdmin must be configured with <code>hbase.replication</code>
61   * set to true.
62   * </p>
63   * <p>
64   * Adding a new peer results in creating new outbound connections from every
65   * region server to a subset of region servers on the slave cluster. Each
66   * new stream of replication will start replicating from the beginning of the
67   * current WAL, meaning that edits from that past will be replicated.
68   * </p>
69   * <p>
70   * Removing a peer is a destructive and irreversible operation that stops
71   * all the replication streams for the given cluster and deletes the metadata
72   * used to keep track of the replication state.
73   * </p>
74   * <p>
75   * To see which commands are available in the shell, type
76   * <code>replication</code>.
77   * </p>
78   */
79  @InterfaceAudience.Public
80  @InterfaceStability.Evolving
81  public class ReplicationAdmin implements Closeable {
82    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
83  
84    public static final String TNAME = "tableName";
85    public static final String CFNAME = "columnFamlyName";
86  
87    // only Global for now, can add other type
88    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
89    public static final String REPLICATIONTYPE = "replicationType";
90    public static final String REPLICATIONGLOBAL = Integer
91        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
92  
93    private final Connection connection;
94    // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
95    // be moved to hbase-server. Resolve it in HBASE-11392.
96    private final ReplicationQueuesClient replicationQueuesClient;
97    private final ReplicationPeers replicationPeers;
98    /**
99     * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
100    * on {@link #close()}.
101    */
102   private final ZooKeeperWatcher zkw;
103 
104   /**
105    * Constructor that creates a connection to the local ZooKeeper ensemble.
106    * @param conf Configuration to use
107    * @throws IOException if an internal replication error occurs
108    * @throws RuntimeException if replication isn't enabled.
109    */
110   public ReplicationAdmin(Configuration conf) throws IOException {
111     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
112         HConstants.REPLICATION_ENABLE_DEFAULT)) {
113       throw new RuntimeException("hbase.replication isn't true, please " +
114           "enable it in order to use replication");
115     }
116     this.connection = ConnectionFactory.createConnection(conf);
117     zkw = createZooKeeperWatcher();
118     try {
119       this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
120       this.replicationPeers.init();
121       this.replicationQueuesClient =
122           ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
123       this.replicationQueuesClient.init();
124 
125     } catch (ReplicationException e) {
126       throw new IOException("Error initializing the replication admin client.", e);
127     }
128   }
129 
130   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
131     // This Abortable doesn't 'abort'... it just logs.
132     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
133       @Override
134       public void abort(String why, Throwable e) {
135         LOG.error(why, e);
136         // We used to call system.exit here but this script can be embedded by other programs that
137         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
138       }
139 
140       @Override
141       public boolean isAborted() {
142         return false;
143       }
144     });
145   }
146 
147   /**
148    * Add a new peer cluster to replicate to.
149    * @param id a short name that identifies the cluster
150    * @param clusterKey the concatenation of the slave cluster's
151    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
152    * @throws IllegalStateException if there's already one slave since
153    * multi-slave isn't supported yet.
154    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
155    */
156   @Deprecated
157   public void addPeer(String id, String clusterKey) throws ReplicationException {
158     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
159   }
160 
161   @Deprecated
162   public void addPeer(String id, String clusterKey, String tableCFs)
163     throws ReplicationException {
164     this.replicationPeers.addPeer(id,
165       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
166   }
167 
168   /**
169    * Add a new remote slave cluster for replication.
170    * @param id a short name that identifies the cluster
171    * @param peerConfig configuration for the replication slave cluster
172    * @param tableCfs the table and column-family list which will be replicated for this peer.
173    * A map from tableName to column family names. An empty collection can be passed
174    * to indicate replicating all column families. Pass null for replicating all table and column
175    * families
176    */
177   public void addPeer(String id, ReplicationPeerConfig peerConfig,
178       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
179     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
180   }
181 
182   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
183     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
184       return null;
185     }
186 
187     Map<TableName, List<String>> tableCFsMap = null;
188     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
189     // parse out (table, cf-list) pairs from tableCFsConfig
190     // format: "table1:cf1,cf2;table2:cfA,cfB"
191     String[] tables = tableCFsConfig.split(";");
192     for (String tab : tables) {
193       // 1 ignore empty table config
194       tab = tab.trim();
195       if (tab.length() == 0) {
196         continue;
197       }
198       // 2 split to "table" and "cf1,cf2"
199       //   for each table: "table:cf1,cf2" or "table"
200       String[] pair = tab.split(":");
201       String tabName = pair[0].trim();
202       if (pair.length > 2 || tabName.length() == 0) {
203         LOG.error("ignore invalid tableCFs setting: " + tab);
204         continue;
205       }
206 
207       // 3 parse "cf1,cf2" part to List<cf>
208       List<String> cfs = null;
209       if (pair.length == 2) {
210         String[] cfsList = pair[1].split(",");
211         for (String cf : cfsList) {
212           String cfName = cf.trim();
213           if (cfName.length() > 0) {
214             if (cfs == null) {
215               cfs = new ArrayList<String>();
216             }
217             cfs.add(cfName);
218           }
219         }
220       }
221 
222       // 4 put <table, List<cf>> to map
223       if (tableCFsMap == null) {
224         tableCFsMap = new HashMap<TableName, List<String>>();
225       }
226       tableCFsMap.put(TableName.valueOf(tabName), cfs);
227     }
228     return tableCFsMap;
229   }
230 
231   @VisibleForTesting
232   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
233     String tableCfsStr = null;
234     if (tableCfs != null) {
235       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
236       StringBuilder builder = new StringBuilder();
237       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
238         if (builder.length() > 0) {
239           builder.append(";");
240         }
241         builder.append(entry.getKey());
242         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
243           builder.append(":");
244           builder.append(StringUtils.join(entry.getValue(), ","));
245         }
246       }
247       tableCfsStr = builder.toString();
248     }
249     return tableCfsStr;
250   }
251 
252   /**
253    * Removes a peer cluster and stops the replication to it.
254    * @param id a short name that identifies the cluster
255    */
256   public void removePeer(String id) throws ReplicationException {
257     this.replicationPeers.removePeer(id);
258   }
259 
260   /**
261    * Restart the replication stream to the specified peer.
262    * @param id a short name that identifies the cluster
263    */
264   public void enablePeer(String id) throws ReplicationException {
265     this.replicationPeers.enablePeer(id);
266   }
267 
268   /**
269    * Stop the replication stream to the specified peer.
270    * @param id a short name that identifies the cluster
271    */
272   public void disablePeer(String id) throws ReplicationException {
273     this.replicationPeers.disablePeer(id);
274   }
275 
276   /**
277    * Get the number of slave clusters the local cluster has.
278    * @return number of slave clusters
279    */
280   public int getPeersCount() {
281     return this.replicationPeers.getAllPeerIds().size();
282   }
283 
284   /**
285    * Map of this cluster's peers for display.
286    * @return A map of peer ids to peer cluster keys
287    * @deprecated use {@link #listPeerConfigs()}
288    */
289   @Deprecated
290   public Map<String, String> listPeers() {
291     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
292     Map<String, String> ret = new HashMap<String, String>(peers.size());
293 
294     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
295       ret.put(entry.getKey(), entry.getValue().getClusterKey());
296     }
297     return ret;
298   }
299 
300   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
301     return this.replicationPeers.getAllPeerConfigs();
302   }
303 
304   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
305     return this.replicationPeers.getReplicationPeerConfig(id);
306   }
307 
308   /**
309    * Get the replicable table-cf config of the specified peer.
310    * @param id a short name that identifies the cluster
311    */
312   public String getPeerTableCFs(String id) throws ReplicationException {
313     return this.replicationPeers.getPeerTableCFsConfig(id);
314   }
315 
316   /**
317    * Set the replicable table-cf config of the specified peer
318    * @param id a short name that identifies the cluster
319    * @deprecated use {@link #setPeerTableCFs(String, Map)}
320    */
321   @Deprecated
322   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
323     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
324   }
325 
326   /**
327    * Append the replicable table-cf config of the specified peer
328    * @param id a short that identifies the cluster
329    * @param tableCfs table-cfs config str
330    * @throws ReplicationException
331    */
332   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
333     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
334   }
335 
336   /**
337    * Append the replicable table-cf config of the specified peer
338    * @param id a short that identifies the cluster
339    * @param tableCfs A map from tableName to column family names
340    * @throws ReplicationException
341    */
342   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
343       throws ReplicationException {
344     if (tableCfs == null) {
345       throw new ReplicationException("tableCfs is null");
346     }
347     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
348     if (preTableCfs == null) {
349       setPeerTableCFs(id, tableCfs);
350       return;
351     }
352 
353     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
354       TableName table = entry.getKey();
355       Collection<String> appendCfs = entry.getValue();
356       if (preTableCfs.containsKey(table)) {
357         List<String> cfs = preTableCfs.get(table);
358         if (cfs == null || appendCfs == null) {
359           preTableCfs.put(table, null);
360         } else {
361           Set<String> cfSet = new HashSet<String>(cfs);
362           cfSet.addAll(appendCfs);
363           preTableCfs.put(table, Lists.newArrayList(cfSet));
364         }
365       } else {
366         if (appendCfs == null || appendCfs.isEmpty()) {
367           preTableCfs.put(table, null);
368         } else {
369           preTableCfs.put(table, Lists.newArrayList(appendCfs));
370         }
371       }
372     }
373     setPeerTableCFs(id, preTableCfs);
374   }
375 
376   /**
377    * Remove some table-cfs from table-cfs config of the specified peer
378    * @param id a short name that identifies the cluster
379    * @param tableCf table-cfs config str
380    * @throws ReplicationException
381    */
382   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
383     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
384   }
385 
386   /**
387    * Remove some table-cfs from config of the specified peer
388    * @param id a short name that identifies the cluster
389    * @param tableCfs A map from tableName to column family names
390    * @throws ReplicationException
391    */
392   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
393       throws ReplicationException {
394     if (tableCfs == null) {
395       throw new ReplicationException("tableCfs is null");
396     }
397 
398     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
399     if (preTableCfs == null) {
400       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
401     }
402     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
403       TableName table = entry.getKey();
404       Collection<String> removeCfs = entry.getValue();
405       if (preTableCfs.containsKey(table)) {
406         List<String> cfs = preTableCfs.get(table);
407         if (cfs == null && removeCfs == null) {
408           preTableCfs.remove(table);
409         } else if (cfs != null && removeCfs != null) {
410           Set<String> cfSet = new HashSet<String>(cfs);
411           cfSet.removeAll(removeCfs);
412           if (cfSet.isEmpty()) {
413             preTableCfs.remove(table);
414           } else {
415             preTableCfs.put(table, Lists.newArrayList(cfSet));
416           }
417         } else if (cfs == null && removeCfs != null) {
418           throw new ReplicationException("Cannot remove cf of table: " + table
419               + " which doesn't specify cfs from table-cfs config in peer: " + id);
420         } else if (cfs != null && removeCfs == null) {
421           throw new ReplicationException("Cannot remove table: " + table
422               + " which has specified cfs from table-cfs config in peer: " + id);
423         }
424       } else {
425         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
426       }
427     }
428     setPeerTableCFs(id, preTableCfs);
429   }
430 
431   /**
432    * Set the replicable table-cf config of the specified peer
433    * @param id a short name that identifies the cluster
434    * @param tableCfs the table and column-family list which will be replicated for this peer.
435    * A map from tableName to column family names. An empty collection can be passed
436    * to indicate replicating all column families. Pass null for replicating all table and column
437    * families
438    */
439   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
440       throws ReplicationException {
441     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
442   }
443 
444   /**
445    * Get the state of the specified peer cluster
446    * @param id String format of the Short name that identifies the peer,
447    * an IllegalArgumentException is thrown if it doesn't exist
448    * @return true if replication is enabled to that peer, false if it isn't
449    */
450   public boolean getPeerState(String id) throws ReplicationException {
451     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
452   }
453 
454   @Override
455   public void close() throws IOException {
456     if (this.zkw != null) {
457       this.zkw.close();
458     }
459     if (this.connection != null) {
460       this.connection.close();
461     }
462   }
463 
464 
465   /**
466    * Find all column families that are replicated from this cluster
467    * @return the full list of the replicated column families of this cluster as:
468    *        tableName, family name, replicationType
469    *
470    * Currently replicationType is Global. In the future, more replication
471    * types may be extended here. For example
472    *  1) the replication may only apply to selected peers instead of all peers
473    *  2) the replicationType may indicate the host Cluster servers as Slave
474    *     for the table:columnFam.
475    */
476   public List<HashMap<String, String>> listReplicated() throws IOException {
477     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
478 
479     Admin admin = connection.getAdmin();
480     HTableDescriptor[] tables;
481     try {
482       tables = admin.listTables();
483     } finally {
484       if (admin!= null) admin.close();
485     }
486 
487     for (HTableDescriptor table : tables) {
488       HColumnDescriptor[] columns = table.getColumnFamilies();
489       String tableName = table.getNameAsString();
490       for (HColumnDescriptor column : columns) {
491         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
492           // At this moment, the columfam is replicated to all peers
493           HashMap<String, String> replicationEntry = new HashMap<String, String>();
494           replicationEntry.put(TNAME, tableName);
495           replicationEntry.put(CFNAME, column.getNameAsString());
496           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
497           replicationColFams.add(replicationEntry);
498         }
499       }
500     }
501 
502     return replicationColFams;
503   }
504 }