1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.replication;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.Set;
31
32 import org.apache.commons.lang.StringUtils;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.classification.InterfaceAudience;
36 import org.apache.hadoop.hbase.classification.InterfaceStability;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.Abortable;
39 import org.apache.hadoop.hbase.HColumnDescriptor;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HTableDescriptor;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.TableNotFoundException;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.hbase.classification.InterfaceStability;
46 import org.apache.hadoop.hbase.client.Admin;
47 import org.apache.hadoop.hbase.client.HBaseAdmin;
48 import org.apache.hadoop.hbase.client.Connection;
49 import org.apache.hadoop.hbase.client.ConnectionFactory;
50 import org.apache.hadoop.hbase.client.RegionLocator;
51 import org.apache.hadoop.hbase.replication.ReplicationException;
52 import org.apache.hadoop.hbase.replication.ReplicationFactory;
53 import org.apache.hadoop.hbase.replication.ReplicationPeer;
54 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
55 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
56 import org.apache.hadoop.hbase.replication.ReplicationPeers;
57 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
58 import org.apache.hadoop.hbase.util.Pair;
59 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
60 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
61 import org.apache.zookeeper.KeeperException;
62 import org.apache.zookeeper.data.Stat;
63
64 import com.google.common.annotations.VisibleForTesting;
65 import com.google.common.collect.Lists;
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 @InterfaceAudience.Public
91 @InterfaceStability.Evolving
92 public class ReplicationAdmin implements Closeable {
93 private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
94
95 public static final String TNAME = "tableName";
96 public static final String CFNAME = "columnFamlyName";
97
98
99
100 public static final String REPLICATIONTYPE = "replicationType";
101 public static final String REPLICATIONGLOBAL = Integer
102 .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
103
104 private final Connection connection;
105
106
107 private final ReplicationQueuesClient replicationQueuesClient;
108 private final ReplicationPeers replicationPeers;
109
110
111
112
113 private final ZooKeeperWatcher zkw;
114
115
116
117
118
119
120
121 public ReplicationAdmin(Configuration conf) throws IOException {
122 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
123 HConstants.REPLICATION_ENABLE_DEFAULT)) {
124 throw new RuntimeException("hbase.replication isn't true, please " +
125 "enable it in order to use replication");
126 }
127 this.connection = ConnectionFactory.createConnection(conf);
128 try {
129 zkw = createZooKeeperWatcher();
130 try {
131 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
132 this.replicationPeers.init();
133 this.replicationQueuesClient =
134 ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
135 this.replicationQueuesClient.init();
136 } catch (Exception exception) {
137 if (zkw != null) {
138 zkw.close();
139 }
140 throw exception;
141 }
142 } catch (Exception exception) {
143 if (connection != null) {
144 connection.close();
145 }
146 if (exception instanceof IOException) {
147 throw (IOException) exception;
148 } else if (exception instanceof RuntimeException) {
149 throw (RuntimeException) exception;
150 } else {
151 throw new IOException("Error initializing the replication admin client.", exception);
152 }
153 }
154 }
155
156 private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
157
158 return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
159 @Override
160 public void abort(String why, Throwable e) {
161 LOG.error(why, e);
162
163
164 }
165
166 @Override
167 public boolean isAborted() {
168 return false;
169 }
170 });
171 }
172
173
174
175
176
177
178
179
180
181
182 @Deprecated
183 public void addPeer(String id, String clusterKey) throws ReplicationException {
184 this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
185 }
186
187 @Deprecated
188 public void addPeer(String id, String clusterKey, String tableCFs)
189 throws ReplicationException {
190 this.replicationPeers.addPeer(id,
191 new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
192 }
193
194
195
196
197
198
199
200
201
202
203 public void addPeer(String id, ReplicationPeerConfig peerConfig,
204 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
205 this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
206 }
207
208 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
209 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
210 return null;
211 }
212
213 Map<TableName, List<String>> tableCFsMap = null;
214
215
216
217 String[] tables = tableCFsConfig.split(";");
218 for (String tab : tables) {
219
220 tab = tab.trim();
221 if (tab.length() == 0) {
222 continue;
223 }
224
225
226 String[] pair = tab.split(":");
227 String tabName = pair[0].trim();
228 if (pair.length > 2 || tabName.length() == 0) {
229 LOG.error("ignore invalid tableCFs setting: " + tab);
230 continue;
231 }
232
233
234 List<String> cfs = null;
235 if (pair.length == 2) {
236 String[] cfsList = pair[1].split(",");
237 for (String cf : cfsList) {
238 String cfName = cf.trim();
239 if (cfName.length() > 0) {
240 if (cfs == null) {
241 cfs = new ArrayList<String>();
242 }
243 cfs.add(cfName);
244 }
245 }
246 }
247
248
249 if (tableCFsMap == null) {
250 tableCFsMap = new HashMap<TableName, List<String>>();
251 }
252 tableCFsMap.put(TableName.valueOf(tabName), cfs);
253 }
254 return tableCFsMap;
255 }
256
257 @VisibleForTesting
258 static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
259 String tableCfsStr = null;
260 if (tableCfs != null) {
261
262 StringBuilder builder = new StringBuilder();
263 for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
264 if (builder.length() > 0) {
265 builder.append(";");
266 }
267 builder.append(entry.getKey());
268 if (entry.getValue() != null && !entry.getValue().isEmpty()) {
269 builder.append(":");
270 builder.append(StringUtils.join(entry.getValue(), ","));
271 }
272 }
273 tableCfsStr = builder.toString();
274 }
275 return tableCfsStr;
276 }
277
278
279
280
281
282 public void removePeer(String id) throws ReplicationException {
283 this.replicationPeers.removePeer(id);
284 }
285
286
287
288
289
290 public void enablePeer(String id) throws ReplicationException {
291 this.replicationPeers.enablePeer(id);
292 }
293
294
295
296
297
298 public void disablePeer(String id) throws ReplicationException {
299 this.replicationPeers.disablePeer(id);
300 }
301
302
303
304
305
306 public int getPeersCount() {
307 return this.replicationPeers.getAllPeerIds().size();
308 }
309
310
311
312
313
314
315 @Deprecated
316 public Map<String, String> listPeers() {
317 Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
318 Map<String, String> ret = new HashMap<String, String>(peers.size());
319
320 for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
321 ret.put(entry.getKey(), entry.getValue().getClusterKey());
322 }
323 return ret;
324 }
325
326 public Map<String, ReplicationPeerConfig> listPeerConfigs() {
327 return this.replicationPeers.getAllPeerConfigs();
328 }
329
330 public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
331 return this.replicationPeers.getReplicationPeerConfig(id);
332 }
333
334
335
336
337
338 public String getPeerTableCFs(String id) throws ReplicationException {
339 return this.replicationPeers.getPeerTableCFsConfig(id);
340 }
341
342
343
344
345
346
347 @Deprecated
348 public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
349 this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
350 }
351
352
353
354
355
356
357
358 public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
359 appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
360 }
361
362
363
364
365
366
367
368 public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
369 throws ReplicationException {
370 if (tableCfs == null) {
371 throw new ReplicationException("tableCfs is null");
372 }
373 Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
374 if (preTableCfs == null) {
375 setPeerTableCFs(id, tableCfs);
376 return;
377 }
378
379 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
380 TableName table = entry.getKey();
381 Collection<String> appendCfs = entry.getValue();
382 if (preTableCfs.containsKey(table)) {
383 List<String> cfs = preTableCfs.get(table);
384 if (cfs == null || appendCfs == null) {
385 preTableCfs.put(table, null);
386 } else {
387 Set<String> cfSet = new HashSet<String>(cfs);
388 cfSet.addAll(appendCfs);
389 preTableCfs.put(table, Lists.newArrayList(cfSet));
390 }
391 } else {
392 if (appendCfs == null || appendCfs.isEmpty()) {
393 preTableCfs.put(table, null);
394 } else {
395 preTableCfs.put(table, Lists.newArrayList(appendCfs));
396 }
397 }
398 }
399 setPeerTableCFs(id, preTableCfs);
400 }
401
402
403
404
405
406
407
408 public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
409 removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
410 }
411
412
413
414
415
416
417
418 public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
419 throws ReplicationException {
420 if (tableCfs == null) {
421 throw new ReplicationException("tableCfs is null");
422 }
423
424 Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
425 if (preTableCfs == null) {
426 throw new ReplicationException("Table-Cfs for peer" + id + " is null");
427 }
428 for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
429 TableName table = entry.getKey();
430 Collection<String> removeCfs = entry.getValue();
431 if (preTableCfs.containsKey(table)) {
432 List<String> cfs = preTableCfs.get(table);
433 if (cfs == null && removeCfs == null) {
434 preTableCfs.remove(table);
435 } else if (cfs != null && removeCfs != null) {
436 Set<String> cfSet = new HashSet<String>(cfs);
437 cfSet.removeAll(removeCfs);
438 if (cfSet.isEmpty()) {
439 preTableCfs.remove(table);
440 } else {
441 preTableCfs.put(table, Lists.newArrayList(cfSet));
442 }
443 } else if (cfs == null && removeCfs != null) {
444 throw new ReplicationException("Cannot remove cf of table: " + table
445 + " which doesn't specify cfs from table-cfs config in peer: " + id);
446 } else if (cfs != null && removeCfs == null) {
447 throw new ReplicationException("Cannot remove table: " + table
448 + " which has specified cfs from table-cfs config in peer: " + id);
449 }
450 } else {
451 throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
452 }
453 }
454 setPeerTableCFs(id, preTableCfs);
455 }
456
457
458
459
460
461
462
463
464
465 public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
466 throws ReplicationException {
467 this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
468 }
469
470
471
472
473
474
475
476 public boolean getPeerState(String id) throws ReplicationException {
477 return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
478 }
479
480 @Override
481 public void close() throws IOException {
482 if (this.zkw != null) {
483 this.zkw.close();
484 }
485 if (this.connection != null) {
486 this.connection.close();
487 }
488 }
489
490
491
492
493
494
495
496
497
498
499
500
501
502 public List<HashMap<String, String>> listReplicated() throws IOException {
503 List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
504
505 Admin admin = connection.getAdmin();
506 HTableDescriptor[] tables;
507 try {
508 tables = admin.listTables();
509 } finally {
510 if (admin!= null) admin.close();
511 }
512
513 for (HTableDescriptor table : tables) {
514 HColumnDescriptor[] columns = table.getColumnFamilies();
515 String tableName = table.getNameAsString();
516 for (HColumnDescriptor column : columns) {
517 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
518
519 HashMap<String, String> replicationEntry = new HashMap<String, String>();
520 replicationEntry.put(TNAME, tableName);
521 replicationEntry.put(CFNAME, column.getNameAsString());
522 replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
523 replicationColFams.add(replicationEntry);
524 }
525 }
526 }
527
528 return replicationColFams;
529 }
530
531
532
533
534
535
536 public void enableTableRep(final TableName tableName) throws IOException {
537 if (tableName == null) {
538 throw new IllegalArgumentException("Table name cannot be null");
539 }
540 try (Admin admin = this.connection.getAdmin()) {
541 if (!admin.tableExists(tableName)) {
542 throw new TableNotFoundException("Table '" + tableName.getNameAsString()
543 + "' does not exists.");
544 }
545 }
546 byte[][] splits = getTableSplitRowKeys(tableName);
547 checkAndSyncTableDescToPeers(tableName, splits);
548 setTableRep(tableName, true);
549 }
550
551
552
553
554
555
556 public void disableTableRep(final TableName tableName) throws IOException {
557 if (tableName == null) {
558 throw new IllegalArgumentException("Table name is null");
559 }
560 try (Admin admin = this.connection.getAdmin()) {
561 if (!admin.tableExists(tableName)) {
562 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
563 + "' does not exists.");
564 }
565 }
566 setTableRep(tableName, false);
567 }
568
569
570
571
572
573
574
575 private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
576 try (RegionLocator locator = connection.getRegionLocator(tableName);) {
577 byte[][] startKeys = locator.getStartKeys();
578 if (startKeys.length == 1) {
579 return null;
580 }
581 byte[][] splits = new byte[startKeys.length - 1][];
582 for (int i = 1; i < startKeys.length; i++) {
583 splits[i - 1] = startKeys[i];
584 }
585 return splits;
586 }
587 }
588
589
590
591
592
593
594
595
596
597
598
599 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
600 throws IOException {
601 List<ReplicationPeer> repPeers = listValidReplicationPeers();
602 if (repPeers == null || repPeers.size() <= 0) {
603 throw new IllegalArgumentException("Found no peer cluster for replication.");
604 }
605 for (ReplicationPeer repPeer : repPeers) {
606 Configuration peerConf = repPeer.getConfiguration();
607 HTableDescriptor htd = null;
608 try (Connection conn = ConnectionFactory.createConnection(peerConf);
609 Admin admin = this.connection.getAdmin();
610 Admin repHBaseAdmin = conn.getAdmin()) {
611 htd = admin.getTableDescriptor(tableName);
612 HTableDescriptor peerHtd = null;
613 if (!repHBaseAdmin.tableExists(tableName)) {
614 repHBaseAdmin.createTable(htd, splits);
615 } else {
616 peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
617 if (peerHtd == null) {
618 throw new IllegalArgumentException("Failed to get table descriptor for table "
619 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
620 } else if (!peerHtd.equals(htd)) {
621 throw new IllegalArgumentException("Table " + tableName.getNameAsString()
622 + " exists in peer cluster " + repPeer.getId()
623 + ", but the table descriptors are not same when comapred with source cluster."
624 + " Thus can not enable the table's replication switch.");
625 }
626 }
627 }
628 }
629 }
630
631 private List<ReplicationPeer> listValidReplicationPeers() {
632 Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
633 if (peers == null || peers.size() <= 0) {
634 return null;
635 }
636 List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
637 for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
638 String peerId = peerEntry.getKey();
639 String clusterKey = peerEntry.getValue().getClusterKey();
640 Configuration peerConf = new Configuration(this.connection.getConfiguration());
641 Stat s = null;
642 try {
643 ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
644 Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
645 ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
646 s =
647 zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
648 null);
649 if (null == s) {
650 LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
651 continue;
652 }
653 validPeers.add(peer);
654 } catch (ReplicationException e) {
655 LOG.warn("Failed to get valid replication peers. "
656 + "Error connecting to peer cluster with peerId=" + peerId);
657 LOG.debug("Failure details to get valid replication peers.", e);
658 continue;
659 } catch (KeeperException e) {
660 LOG.warn("Failed to get valid replication peers. KeeperException code="
661 + e.code().intValue());
662 LOG.debug("Failure details to get valid replication peers.", e);
663 continue;
664 } catch (InterruptedException e) {
665 LOG.warn("Failed to get valid replication peers due to InterruptedException.");
666 LOG.debug("Failure details to get valid replication peers.", e);
667 continue;
668 } catch (IOException e) {
669 LOG.warn("Failed to get valid replication peers due to IOException.");
670 LOG.debug("Failure details to get valid replication peers.", e);
671 continue;
672 }
673 }
674 return validPeers;
675 }
676
677
678
679
680
681
682
683 private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
684 Admin admin = null;
685 try {
686 admin = this.connection.getAdmin();
687 HTableDescriptor htd = admin.getTableDescriptor(tableName);
688 if (isTableRepEnabled(htd) ^ isRepEnabled) {
689 boolean isOnlineSchemaUpdateEnabled =
690 this.connection.getConfiguration()
691 .getBoolean("hbase.online.schema.update.enable", true);
692 if (!isOnlineSchemaUpdateEnabled) {
693 admin.disableTable(tableName);
694 }
695 for (HColumnDescriptor hcd : htd.getFamilies()) {
696 hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
697 : HConstants.REPLICATION_SCOPE_LOCAL);
698 }
699 admin.modifyTable(tableName, htd);
700 if (!isOnlineSchemaUpdateEnabled) {
701 admin.enableTable(tableName);
702 }
703 }
704 } finally {
705 if (admin != null) {
706 try {
707 admin.close();
708 } catch (IOException e) {
709 LOG.warn("Failed to close admin connection.");
710 LOG.debug("Details on failure to close admin connection.", e);
711 }
712 }
713 }
714 }
715
716
717
718
719
720 private boolean isTableRepEnabled(HTableDescriptor htd) {
721 for (HColumnDescriptor hcd : htd.getFamilies()) {
722 if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
723 return false;
724 }
725 }
726 return true;
727 }
728 }