View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.classification.InterfaceAudience;
31  import org.apache.hadoop.classification.InterfaceStability;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.Abortable;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.client.HConnection;
38  import org.apache.hadoop.hbase.client.HConnectionManager;
39  import org.apache.hadoop.hbase.replication.ReplicationException;
40  import org.apache.hadoop.hbase.replication.ReplicationFactory;
41  import org.apache.hadoop.hbase.replication.ReplicationPeers;
42  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
43  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44  
45  /**
46   * <p>
47   * This class provides the administrative interface to HBase cluster
48   * replication. In order to use it, the cluster and the client using
49   * ReplicationAdmin must be configured with <code>hbase.replication</code>
50   * set to true.
51   * </p>
52   * <p>
53   * Adding a new peer results in creating new outbound connections from every
54   * region server to a subset of region servers on the slave cluster. Each
55   * new stream of replication will start replicating from the beginning of the
56   * current HLog, meaning that edits from that past will be replicated.
57   * </p>
58   * <p>
59   * Removing a peer is a destructive and irreversible operation that stops
60   * all the replication streams for the given cluster and deletes the metadata
61   * used to keep track of the replication state.
62   * </p>
63   * <p>
64   * To see which commands are available in the shell, type
65   * <code>replication</code>.
66   * </p>
67   */
68  @InterfaceAudience.Public
69  @InterfaceStability.Evolving
70  public class ReplicationAdmin implements Closeable {
71    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
72  
73    public static final String TNAME = "tableName";
74    public static final String CFNAME = "columnFamlyName";
75  
76    // only Global for now, can add other type
77    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
78    public static final String REPLICATIONTYPE = "replicationType";
79    public static final String REPLICATIONGLOBAL = Integer
80        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
81  
82    private final HConnection connection;
83    private final ReplicationQueuesClient replicationQueuesClient;
84    private final ReplicationPeers replicationPeers;
85  
86    /**
87     * Constructor that creates a connection to the local ZooKeeper ensemble.
88     * @param conf Configuration to use
89     * @throws IOException if an internal replication error occurs
90     * @throws RuntimeException if replication isn't enabled.
91     */
92    public ReplicationAdmin(Configuration conf) throws IOException {
93      if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
94          HConstants.REPLICATION_ENABLE_DEFAULT)) {
95        throw new RuntimeException("hbase.replication isn't true, please " +
96            "enable it in order to use replication");
97      }
98      this.connection = HConnectionManager.getConnection(conf);
99      ZooKeeperWatcher zkw = createZooKeeperWatcher();
100     try {
101       this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
102       this.replicationPeers.init();
103       this.replicationQueuesClient =
104           ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
105       this.replicationQueuesClient.init();
106 
107     } catch (ReplicationException e) {
108       throw new IOException("Error initializing the replication admin client.", e);
109     }
110   }
111 
112   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
113     return new ZooKeeperWatcher(connection.getConfiguration(),
114       "Replication Admin", new Abortable() {
115       @Override
116       public void abort(String why, Throwable e) {
117         LOG.error(why, e);
118         System.exit(1);
119       }
120 
121       @Override
122       public boolean isAborted() {
123         return false;
124       }
125 
126     });
127   }
128 
129 
130   /**
131    * Add a new peer cluster to replicate to.
132    * @param id a short that identifies the cluster
133    * @param clusterKey the concatenation of the slave cluster's
134    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
135    * @throws IllegalStateException if there's already one slave since
136    * multi-slave isn't supported yet.
137    */
138   public void addPeer(String id, String clusterKey) throws ReplicationException {
139     this.replicationPeers.addPeer(id, clusterKey);
140   }
141 
142   public void addPeer(String id, String clusterKey, String tableCFs)
143     throws ReplicationException {
144     this.replicationPeers.addPeer(id, clusterKey, tableCFs);
145   }
146 
147   /**
148    * Removes a peer cluster and stops the replication to it.
149    * @param id a short that identifies the cluster
150    */
151   public void removePeer(String id) throws ReplicationException {
152     this.replicationPeers.removePeer(id);
153   }
154 
155   /**
156    * Restart the replication stream to the specified peer.
157    * @param id a short that identifies the cluster
158    */
159   public void enablePeer(String id) throws ReplicationException {
160     this.replicationPeers.enablePeer(id);
161   }
162 
163   /**
164    * Stop the replication stream to the specified peer.
165    * @param id a short that identifies the cluster
166    */
167   public void disablePeer(String id) throws ReplicationException {
168     this.replicationPeers.disablePeer(id);
169   }
170 
171   /**
172    * Get the number of slave clusters the local cluster has.
173    * @return number of slave clusters
174    */
175   public int getPeersCount() {
176     return this.replicationPeers.getAllPeerIds().size();
177   }
178 
179   /**
180    * Map of this cluster's peers for display.
181    * @return A map of peer ids to peer cluster keys
182    */
183   public Map<String, String> listPeers() {
184     return this.replicationPeers.getAllPeerClusterKeys();
185   }
186 
187   /**
188    * Get the replicable table-cf config of the specified peer.
189    * @param id a short that identifies the cluster
190    */
191   public String getPeerTableCFs(String id) throws ReplicationException {
192     return this.replicationPeers.getPeerTableCFsConfig(id);
193   }
194 
195   /**
196    * Set the replicable table-cf config of the specified peer
197    * @param id a short that identifies the cluster
198    */
199   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
200     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
201   }
202 
203   /**
204    * Get the state of the specified peer cluster
205    * @param id String format of the Short that identifies the peer, an IllegalArgumentException
206    *           is thrown if it doesn't exist
207    * @return true if replication is enabled to that peer, false if it isn't
208    */
209   public boolean getPeerState(String id) throws ReplicationException {
210     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
211   }
212 
213   @Override
214   public void close() throws IOException {
215     if (this.connection != null) {
216       this.connection.close();
217     }
218   }
219 
220   
221   /**
222    * Find all column families that are replicated from this cluster
223    * @return the full list of the replicated column families of this cluster as:
224    *        tableName, family name, replicationType
225    *
226    * Currently replicationType is Global. In the future, more replication
227    * types may be extended here. For example
228    *  1) the replication may only apply to selected peers instead of all peers
229    *  2) the replicationType may indicate the host Cluster servers as Slave
230    *     for the table:columnFam.         
231    */
232   public List<HashMap<String, String>> listReplicated() throws IOException {
233     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
234     HTableDescriptor[] tables = this.connection.listTables();
235 
236     for (HTableDescriptor table : tables) {
237       HColumnDescriptor[] columns = table.getColumnFamilies();
238       String tableName = table.getNameAsString();
239       for (HColumnDescriptor column : columns) {
240         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
241           // At this moment, the columfam is replicated to all peers
242           HashMap<String, String> replicationEntry = new HashMap<String, String>();
243           replicationEntry.put(TNAME, tableName);
244           replicationEntry.put(CFNAME, column.getNameAsString());
245           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
246           replicationColFams.add(replicationEntry);
247         }
248       }
249     }
250 
251     return replicationColFams;
252   } 
253 }