View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import org.apache.commons.lang.StringUtils;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.classification.InterfaceStability;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Abortable;
36  import org.apache.hadoop.hbase.HColumnDescriptor;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.client.HConnection;
41  import org.apache.hadoop.hbase.client.HConnectionManager;
42  import org.apache.hadoop.hbase.replication.ReplicationException;
43  import org.apache.hadoop.hbase.replication.ReplicationFactory;
44  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
45  import org.apache.hadoop.hbase.replication.ReplicationPeers;
46  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48  
49  import com.google.common.annotations.VisibleForTesting;
50  
51  /**
52   * <p>
53   * This class provides the administrative interface to HBase cluster
54   * replication. In order to use it, the cluster and the client using
55   * ReplicationAdmin must be configured with <code>hbase.replication</code>
56   * set to true.
57   * </p>
58   * <p>
59   * Adding a new peer results in creating new outbound connections from every
60   * region server to a subset of region servers on the slave cluster. Each
61   * new stream of replication will start replicating from the beginning of the
62   * current HLog, meaning that edits from that past will be replicated.
63   * </p>
64   * <p>
65   * Removing a peer is a destructive and irreversible operation that stops
66   * all the replication streams for the given cluster and deletes the metadata
67   * used to keep track of the replication state.
68   * </p>
69   * <p>
70   * To see which commands are available in the shell, type
71   * <code>replication</code>.
72   * </p>
73   */
74  @InterfaceAudience.Public
75  @InterfaceStability.Evolving
76  public class ReplicationAdmin implements Closeable {
77    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
78  
79    public static final String TNAME = "tableName";
80    public static final String CFNAME = "columnFamlyName";
81  
82    // only Global for now, can add other type
83    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
84    public static final String REPLICATIONTYPE = "replicationType";
85    public static final String REPLICATIONGLOBAL = Integer
86        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
87  
88    private final HConnection connection;
89    // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
90    // be moved to hbase-server. Resolve it in HBASE-11392.
91    private final ReplicationQueuesClient replicationQueuesClient;
92    private final ReplicationPeers replicationPeers;
93  
94    /**
95     * Constructor that creates a connection to the local ZooKeeper ensemble.
96     * @param conf Configuration to use
97     * @throws IOException if an internal replication error occurs
98     * @throws RuntimeException if replication isn't enabled.
99     */
100   public ReplicationAdmin(Configuration conf) throws IOException {
101     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
102         HConstants.REPLICATION_ENABLE_DEFAULT)) {
103       throw new RuntimeException("hbase.replication isn't true, please " +
104           "enable it in order to use replication");
105     }
106     this.connection = HConnectionManager.getConnection(conf);
107     ZooKeeperWatcher zkw = createZooKeeperWatcher();
108     try {
109       this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
110       this.replicationPeers.init();
111       this.replicationQueuesClient =
112           ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
113       this.replicationQueuesClient.init();
114 
115     } catch (ReplicationException e) {
116       throw new IOException("Error initializing the replication admin client.", e);
117     }
118   }
119 
120   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
121     return new ZooKeeperWatcher(connection.getConfiguration(),
122       "Replication Admin", new Abortable() {
123       @Override
124       public void abort(String why, Throwable e) {
125         LOG.error(why, e);
126         System.exit(1);
127       }
128 
129       @Override
130       public boolean isAborted() {
131         return false;
132       }
133 
134     });
135   }
136 
137   /**
138    * Add a new peer cluster to replicate to.
139    * @param id a short name that identifies the cluster
140    * @param clusterKey the concatenation of the slave cluster's
141    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
142    * @throws IllegalStateException if there's already one slave since
143    * multi-slave isn't supported yet.
144    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
145    */
146   @Deprecated
147   public void addPeer(String id, String clusterKey) throws ReplicationException {
148     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
149   }
150 
151   @Deprecated
152   public void addPeer(String id, String clusterKey, String tableCFs)
153     throws ReplicationException {
154     this.replicationPeers.addPeer(id,
155       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
156   }
157 
158   /**
159    * Add a new remote slave cluster for replication.
160    * @param id a short name that identifies the cluster
161    * @param peerConfig configuration for the replication slave cluster
162    * @param tableCfs the table and column-family list which will be replicated for this peer.
163    * A map from tableName to column family names. An empty collection can be passed
164    * to indicate replicating all column families. Pass null for replicating all table and column
165    * families
166    */
167   public void addPeer(String id, ReplicationPeerConfig peerConfig,
168       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
169     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
170   }
171 
172   @VisibleForTesting
173   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
174     String tableCfsStr = null;
175     if (tableCfs != null) {
176       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
177       StringBuilder builder = new StringBuilder();
178       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
179         if (builder.length() > 0) {
180           builder.append(";");
181         }
182         builder.append(entry.getKey());
183         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
184           builder.append(":");
185           builder.append(StringUtils.join(entry.getValue(), ","));
186         }
187       }
188       tableCfsStr = builder.toString();
189     }
190     return tableCfsStr;
191   }
192 
193   /**
194    * Removes a peer cluster and stops the replication to it.
195    * @param id a short name that identifies the cluster
196    */
197   public void removePeer(String id) throws ReplicationException {
198     this.replicationPeers.removePeer(id);
199   }
200 
201   /**
202    * Restart the replication stream to the specified peer.
203    * @param id a short name that identifies the cluster
204    */
205   public void enablePeer(String id) throws ReplicationException {
206     this.replicationPeers.enablePeer(id);
207   }
208 
209   /**
210    * Stop the replication stream to the specified peer.
211    * @param id a short name that identifies the cluster
212    */
213   public void disablePeer(String id) throws ReplicationException {
214     this.replicationPeers.disablePeer(id);
215   }
216 
217   /**
218    * Get the number of slave clusters the local cluster has.
219    * @return number of slave clusters
220    */
221   public int getPeersCount() {
222     return this.replicationPeers.getAllPeerIds().size();
223   }
224 
225   /**
226    * Map of this cluster's peers for display.
227    * @return A map of peer ids to peer cluster keys
228    * @deprecated use {@link #listPeerConfigs()}
229    */
230   @Deprecated
231   public Map<String, String> listPeers() {
232     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
233     Map<String, String> ret = new HashMap<String, String>(peers.size());
234 
235     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
236       ret.put(entry.getKey(), entry.getValue().getClusterKey());
237     }
238     return ret;
239   }
240 
241   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
242     return this.replicationPeers.getAllPeerConfigs();
243   }
244 
245   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
246     return this.replicationPeers.getReplicationPeerConfig(id);
247   }
248 
249   /**
250    * Get the replicable table-cf config of the specified peer.
251    * @param id a short name that identifies the cluster
252    */
253   public String getPeerTableCFs(String id) throws ReplicationException {
254     return this.replicationPeers.getPeerTableCFsConfig(id);
255   }
256 
257   /**
258    * Set the replicable table-cf config of the specified peer
259    * @param id a short name that identifies the cluster
260    * @deprecated use {@link #setPeerTableCFs(String, Map)}
261    */
262   @Deprecated
263   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
264     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
265   }
266 
267   /**
268    * Set the replicable table-cf config of the specified peer
269    * @param id a short name that identifies the cluster
270    * @param tableCfs the table and column-family list which will be replicated for this peer.
271    * A map from tableName to column family names. An empty collection can be passed
272    * to indicate replicating all column families. Pass null for replicating all table and column
273    * families
274    */
275   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
276       throws ReplicationException {
277     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
278   }
279 
280   /**
281    * Get the state of the specified peer cluster
282    * @param id String format of the Short name that identifies the peer,
283    * an IllegalArgumentException is thrown if it doesn't exist
284    * @return true if replication is enabled to that peer, false if it isn't
285    */
286   public boolean getPeerState(String id) throws ReplicationException {
287     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
288   }
289 
290   @Override
291   public void close() throws IOException {
292     if (this.connection != null) {
293       this.connection.close();
294     }
295   }
296 
297 
298   /**
299    * Find all column families that are replicated from this cluster
300    * @return the full list of the replicated column families of this cluster as:
301    *        tableName, family name, replicationType
302    *
303    * Currently replicationType is Global. In the future, more replication
304    * types may be extended here. For example
305    *  1) the replication may only apply to selected peers instead of all peers
306    *  2) the replicationType may indicate the host Cluster servers as Slave
307    *     for the table:columnFam.
308    */
309   public List<HashMap<String, String>> listReplicated() throws IOException {
310     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
311     HTableDescriptor[] tables = this.connection.listTables();
312 
313     for (HTableDescriptor table : tables) {
314       HColumnDescriptor[] columns = table.getColumnFamilies();
315       String tableName = table.getNameAsString();
316       for (HColumnDescriptor column : columns) {
317         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
318           // At this moment, the columfam is replicated to all peers
319           HashMap<String, String> replicationEntry = new HashMap<String, String>();
320           replicationEntry.put(TNAME, tableName);
321           replicationEntry.put(CFNAME, column.getNameAsString());
322           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
323           replicationColFams.add(replicationEntry);
324         }
325       }
326     }
327 
328     return replicationColFams;
329   }
330 }