View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  
24  import java.io.Closeable;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Set;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Abortable;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.TableNotFoundException;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.classification.InterfaceStability;
46  import org.apache.hadoop.hbase.client.Admin;
47  import org.apache.hadoop.hbase.client.Connection;
48  import org.apache.hadoop.hbase.client.ConnectionFactory;
49  import org.apache.hadoop.hbase.client.RegionLocator;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationFactory;
52  import org.apache.hadoop.hbase.replication.ReplicationPeer;
53  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
57  import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
60
61  /**
62   * <p>
63   * This class provides the administrative interface to HBase cluster
64   * replication. In order to use it, the cluster and the client using
65   * ReplicationAdmin must be configured with <code>hbase.replication</code>
66   * set to true.
67   * </p>
68   * <p>
69   * Adding a new peer results in creating new outbound connections from every
70   * region server to a subset of region servers on the slave cluster. Each
71   * new stream of replication will start replicating from the beginning of the
72   * current WAL, meaning that edits from that past will be replicated.
73   * </p>
74   * <p>
75   * Removing a peer is a destructive and irreversible operation that stops
76   * all the replication streams for the given cluster and deletes the metadata
77   * used to keep track of the replication state.
78   * </p>
79   * <p>
80   * To see which commands are available in the shell, type
81   * <code>replication</code>.
82   * </p>
83   */
84  @InterfaceAudience.Public
85  @InterfaceStability.Evolving
86  public class ReplicationAdmin implements Closeable {
87    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
88
89    public static final String TNAME = "tableName";
90    public static final String CFNAME = "columnFamilyName";
91
92    // only Global for now, can add other type
93    // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
94    public static final String REPLICATIONTYPE = "replicationType";
95    public static final String REPLICATIONGLOBAL = Integer
96        .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
97
98    private final Connection connection;
99    // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
100   // be moved to hbase-server. Resolve it in HBASE-11392.
101   private final ReplicationQueuesClient replicationQueuesClient;
102   private final ReplicationPeers replicationPeers;
103   /**
104    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
105    * on {@link #close()}.
106    */
107   private final ZooKeeperWatcher zkw;
108
109   /**
110    * Constructor that creates a connection to the local ZooKeeper ensemble.
111    * @param conf Configuration to use
112    * @throws IOException if an internal replication error occurs
113    * @throws RuntimeException if replication isn't enabled.
114    */
115   public ReplicationAdmin(Configuration conf) throws IOException {
116     this.connection = ConnectionFactory.createConnection(conf);
117     try {
118       zkw = createZooKeeperWatcher();
119       try {
120         this.replicationQueuesClient =
121             ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf,
122             this.connection, zkw));
123         this.replicationQueuesClient.init();
124         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
125           this.replicationQueuesClient, this.connection);
126         this.replicationPeers.init();
127       } catch (Exception exception) {
128         if (zkw != null) {
129           zkw.close();
130         }
131         throw exception;
132       }
133     } catch (Exception exception) {
134       if (connection != null) {
135         connection.close();
136       }
137       if (exception instanceof IOException) {
138         throw (IOException) exception;
139       } else if (exception instanceof RuntimeException) {
140         throw (RuntimeException) exception;
141       } else {
142         throw new IOException("Error initializing the replication admin client.", exception);
143       }
144     }
145   }
146
147   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
148     // This Abortable doesn't 'abort'... it just logs.
149     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
150       @Override
151       public void abort(String why, Throwable e) {
152         LOG.error(why, e);
153         // We used to call system.exit here but this script can be embedded by other programs that
154         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
155       }
156
157       @Override
158       public boolean isAborted() {
159         return false;
160       }
161     });
162   }
163
164   /**
165    * Add a new remote slave cluster for replication.
166    * @param id a short name that identifies the cluster
167    * @param peerConfig configuration for the replication slave cluster
168    * @param tableCfs the table and column-family list which will be replicated for this peer.
169    * A map from tableName to column family names. An empty collection can be passed
170    * to indicate replicating all column families. Pass null for replicating all table and column
171    * families
172    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
173    * use {@link #addPeer(String, ReplicationPeerConfig)} instead.
174    */
175   @Deprecated
176   public void addPeer(String id, ReplicationPeerConfig peerConfig,
177       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
178     if (tableCfs != null) {
179       peerConfig.setTableCFsMap(tableCfs);
180     }
181     this.replicationPeers.registerPeer(id, peerConfig);
182   }
183
184   /**
185    * Add a new remote slave cluster for replication.
186    * @param id a short name that identifies the cluster
187    * @param peerConfig configuration for the replication slave cluster
188    */
189   public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
190     this.replicationPeers.registerPeer(id, peerConfig);
191   }
192
193   /**
194    *  @deprecated as release of 2.0.0, and it will be removed in 3.0.0
195    * */
196   @Deprecated
197   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
198     return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
199   }
200
201   public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
202       throws ReplicationException {
203     this.replicationPeers.updatePeerConfig(id, peerConfig);
204   }
205   /**
206    * Removes a peer cluster and stops the replication to it.
207    * @param id a short name that identifies the cluster
208    */
209   public void removePeer(String id) throws ReplicationException {
210     this.replicationPeers.unregisterPeer(id);
211   }
212
213   /**
214    * Restart the replication stream to the specified peer.
215    * @param id a short name that identifies the cluster
216    */
217   public void enablePeer(String id) throws ReplicationException {
218     this.replicationPeers.enablePeer(id);
219   }
220
221   /**
222    * Stop the replication stream to the specified peer.
223    * @param id a short name that identifies the cluster
224    */
225   public void disablePeer(String id) throws ReplicationException {
226     this.replicationPeers.disablePeer(id);
227   }
228
229   /**
230    * Get the number of slave clusters the local cluster has.
231    * @return number of slave clusters
232    */
233   public int getPeersCount() {
234     return this.replicationPeers.getAllPeerIds().size();
235   }
236
237   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
238     return this.replicationPeers.getAllPeerConfigs();
239   }
240
241   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
242     return this.replicationPeers.getReplicationPeerConfig(id);
243   }
244
245   /**
246    * Get the replicable table-cf config of the specified peer.
247    * @param id a short name that identifies the cluster
248    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
249    * use {@link #getPeerConfig(String)} instead.
250    * */
251   @Deprecated
252   public String getPeerTableCFs(String id) throws ReplicationException {
253     return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id));
254   }
255
256   /**
257    * Append the replicable table-cf config of the specified peer
258    * @param id a short that identifies the cluster
259    * @param tableCfs table-cfs config str
260    * @throws ReplicationException
261    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
262    * use {@link #appendPeerTableCFs(String, Map)} instead.
263    */
264   @Deprecated
265   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
266     appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
267   }
268
269   /**
270    * Append the replicable table-cf config of the specified peer
271    * @param id a short that identifies the cluster
272    * @param tableCfs A map from tableName to column family names
273    * @throws ReplicationException
274    */
275   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
276       throws ReplicationException {
277     if (tableCfs == null) {
278       throw new ReplicationException("tableCfs is null");
279     }
280     Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
281     if (preTableCfs == null) {
282       setPeerTableCFs(id, tableCfs);
283       return;
284     }
285     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
286       TableName table = entry.getKey();
287       Collection<String> appendCfs = entry.getValue();
288       if (preTableCfs.containsKey(table)) {
289         List<String> cfs = preTableCfs.get(table);
290         if (cfs == null || appendCfs == null) {
291           preTableCfs.put(table, null);
292         } else {
293           Set<String> cfSet = new HashSet<String>(cfs);
294           cfSet.addAll(appendCfs);
295           preTableCfs.put(table, Lists.newArrayList(cfSet));
296
297         }
298       } else {
299         if (appendCfs == null || appendCfs.isEmpty()) {
300           preTableCfs.put(table, null);
301         } else {
302           preTableCfs.put(table, Lists.newArrayList(appendCfs));
303         }
304       }
305     }
306     setPeerTableCFs(id, preTableCfs);
307   }
308
309   /**
310    * Remove some table-cfs from table-cfs config of the specified peer
311    * @param id a short name that identifies the cluster
312    * @param tableCf table-cfs config str
313    * @throws ReplicationException
314    * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
315    * use {@link #removePeerTableCFs(String, Map)} instead.
316    */
317   @Deprecated
318   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
319     removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
320   }
321
322   /**
323    * Remove some table-cfs from config of the specified peer
324    * @param id a short name that identifies the cluster
325    * @param tableCfs A map from tableName to column family names
326    * @throws ReplicationException
327    */
328   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
329       throws ReplicationException {
330     if (tableCfs == null) {
331       throw new ReplicationException("tableCfs is null");
332     }
333     Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
334     if (preTableCfs == null) {
335       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
336     }
337     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
338
339       TableName table = entry.getKey();
340       Collection<String> removeCfs = entry.getValue();
341       if (preTableCfs.containsKey(table)) {
342         List<String> cfs = preTableCfs.get(table);
343         if (cfs == null && removeCfs == null) {
344           preTableCfs.remove(table);
345         } else if (cfs != null && removeCfs != null) {
346           Set<String> cfSet = new HashSet<String>(cfs);
347           cfSet.removeAll(removeCfs);
348           if (cfSet.isEmpty()) {
349             preTableCfs.remove(table);
350           } else {
351             preTableCfs.put(table, Lists.newArrayList(cfSet));
352           }
353         } else if (cfs == null && removeCfs != null) {
354           throw new ReplicationException("Cannot remove cf of table: " + table
355               + " which doesn't specify cfs from table-cfs config in peer: " + id);
356         } else if (cfs != null && removeCfs == null) {
357           throw new ReplicationException("Cannot remove table: " + table
358               + " which has specified cfs from table-cfs config in peer: " + id);
359         }
360       } else {
361         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
362
363       }
364     }
365     setPeerTableCFs(id, preTableCfs);
366   }
367
368   /**
369    * Set the replicable table-cf config of the specified peer
370    * @param id a short name that identifies the cluster
371    * @param tableCfs the table and column-family list which will be replicated for this peer.
372    * A map from tableName to column family names. An empty collection can be passed
373    * to indicate replicating all column families. Pass null for replicating all table and column
374    * families
375    */
376   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
377       throws ReplicationException {
378     this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
379   }
380
381   /**
382    * Get the state of the specified peer cluster
383    * @param id String format of the Short name that identifies the peer,
384    * an IllegalArgumentException is thrown if it doesn't exist
385    * @return true if replication is enabled to that peer, false if it isn't
386    */
387   public boolean getPeerState(String id) throws ReplicationException {
388     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
389   }
390
391   @Override
392   public void close() throws IOException {
393     if (this.zkw != null) {
394       this.zkw.close();
395     }
396     if (this.connection != null) {
397       this.connection.close();
398     }
399   }
400
401
402   /**
403    * Find all column families that are replicated from this cluster
404    * @return the full list of the replicated column families of this cluster as:
405    *        tableName, family name, replicationType
406    *
407    * Currently replicationType is Global. In the future, more replication
408    * types may be extended here. For example
409    *  1) the replication may only apply to selected peers instead of all peers
410    *  2) the replicationType may indicate the host Cluster servers as Slave
411    *     for the table:columnFam.
412    */
413   public List<HashMap<String, String>> listReplicated() throws IOException {
414     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
415
416     Admin admin = connection.getAdmin();
417     HTableDescriptor[] tables;
418     try {
419       tables = admin.listTables();
420     } finally {
421       if (admin!= null) admin.close();
422     }
423
424     for (HTableDescriptor table : tables) {
425       HColumnDescriptor[] columns = table.getColumnFamilies();
426       String tableName = table.getNameAsString();
427       for (HColumnDescriptor column : columns) {
428         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
429           // At this moment, the columfam is replicated to all peers
430           HashMap<String, String> replicationEntry = new HashMap<String, String>();
431           replicationEntry.put(TNAME, tableName);
432           replicationEntry.put(CFNAME, column.getNameAsString());
433           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
434           replicationColFams.add(replicationEntry);
435         }
436       }
437     }
438
439     return replicationColFams;
440   }
441
442   /**
443    * Enable a table's replication switch.
444    * @param tableName name of the table
445    * @throws IOException if a remote or network exception occurs
446    */
447   public void enableTableRep(final TableName tableName) throws IOException {
448     if (tableName == null) {
449       throw new IllegalArgumentException("Table name cannot be null");
450     }
451     try (Admin admin = this.connection.getAdmin()) {
452       if (!admin.tableExists(tableName)) {
453         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
454             + "' does not exists.");
455       }
456     }
457     byte[][] splits = getTableSplitRowKeys(tableName);
458     checkAndSyncTableDescToPeers(tableName, splits);
459     setTableRep(tableName, true);
460   }
461
462   /**
463    * Disable a table's replication switch.
464    * @param tableName name of the table
465    * @throws IOException if a remote or network exception occurs
466    */
467   public void disableTableRep(final TableName tableName) throws IOException {
468     if (tableName == null) {
469       throw new IllegalArgumentException("Table name is null");
470     }
471     try (Admin admin = this.connection.getAdmin()) {
472       if (!admin.tableExists(tableName)) {
473         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
474             + "' does not exists.");
475       }
476     }
477     setTableRep(tableName, false);
478   }
479
480   /**
481    * Get the split row keys of table
482    * @param tableName table name
483    * @return array of split row keys
484    * @throws IOException
485    */
486   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
487     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
488       byte[][] startKeys = locator.getStartKeys();
489       if (startKeys.length == 1) {
490         return null;
491       }
492       byte[][] splits = new byte[startKeys.length - 1][];
493       for (int i = 1; i < startKeys.length; i++) {
494         splits[i - 1] = startKeys[i];
495       }
496       return splits;
497     }
498   }
499
500   /**
501    * Connect to peer and check the table descriptor on peer:
502    * <ol>
503    * <li>Create the same table on peer when not exist.</li>
504    * <li>Throw exception if the table exists on peer cluster but descriptors are not same.</li>
505    * </ol>
506    * @param tableName name of the table to sync to the peer
507    * @param splits table split keys
508    * @throws IOException
509    */
510   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
511       throws IOException {
512     List<ReplicationPeer> repPeers = listReplicationPeers();
513     if (repPeers == null || repPeers.size() <= 0) {
514       throw new IllegalArgumentException("Found no peer cluster for replication.");
515     }
516
517     final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
518
519     for (ReplicationPeer repPeer : repPeers) {
520       Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
521       // TODO Currently peer TableCFs will not include namespace so we need to check only for table
522       // name without namespace in it. Need to correct this logic once we fix HBASE-11386.
523       if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
524         continue;
525       }
526
527       Configuration peerConf = repPeer.getConfiguration();
528       HTableDescriptor htd = null;
529       try (Connection conn = ConnectionFactory.createConnection(peerConf);
530           Admin admin = this.connection.getAdmin();
531           Admin repHBaseAdmin = conn.getAdmin()) {
532         htd = admin.getTableDescriptor(tableName);
533         HTableDescriptor peerHtd = null;
534         if (!repHBaseAdmin.tableExists(tableName)) {
535           repHBaseAdmin.createTable(htd, splits);
536         } else {
537           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
538           if (peerHtd == null) {
539             throw new IllegalArgumentException("Failed to get table descriptor for table "
540                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
541           } else if (!peerHtd.equals(htd)) {
542             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
543                 + " exists in peer cluster " + repPeer.getId()
544                 + ", but the table descriptors are not same when comapred with source cluster."
545                 + " Thus can not enable the table's replication switch.");
546           }
547         }
548       }
549     }
550   }
551
552   @VisibleForTesting
553   public void peerAdded(String id) throws ReplicationException {
554     this.replicationPeers.peerConnected(id);
555   }
556
557   @VisibleForTesting
558   List<ReplicationPeer> listReplicationPeers() {
559     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
560     if (peers == null || peers.size() <= 0) {
561       return null;
562     }
563     List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
564     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
565       String peerId = peerEntry.getKey();
566       try {
567         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
568         Configuration peerConf = pair.getSecond();
569         ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
570           peerId, pair.getFirst(), this.connection);
571         listOfPeers.add(peer);
572       } catch (ReplicationException e) {
573         LOG.warn("Failed to get valid replication peers. "
574             + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
575             + e.getMessage());
576         LOG.debug("Failure details to get valid replication peers.", e);
577         continue;
578       }
579     }
580     return listOfPeers;
581   }
582
583   /**
584    * Set the table's replication switch if the table's replication switch is already not set.
585    * @param tableName name of the table
586    * @param isRepEnabled is replication switch enable or disable
587    * @throws IOException if a remote or network exception occurs
588    */
589   private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
590     Admin admin = null;
591     try {
592       admin = this.connection.getAdmin();
593       HTableDescriptor htd = admin.getTableDescriptor(tableName);
594       if (isTableRepEnabled(htd) ^ isRepEnabled) {
595         for (HColumnDescriptor hcd : htd.getFamilies()) {
596           hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
597               : HConstants.REPLICATION_SCOPE_LOCAL);
598         }
599         admin.modifyTable(tableName, htd);
600       }
601     } finally {
602       if (admin != null) {
603         try {
604           admin.close();
605         } catch (IOException e) {
606           LOG.warn("Failed to close admin connection.");
607           LOG.debug("Details on failure to close admin connection.", e);
608         }
609       }
610     }
611   }
612
613   /**
614    * @param htd table descriptor details for the table to check
615    * @return true if table's replication switch is enabled
616    */
617   private boolean isTableRepEnabled(HTableDescriptor htd) {
618     for (HColumnDescriptor hcd : htd.getFamilies()) {
619       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
620         return false;
621       }
622     }
623     return true;
624   }
625 }