1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.replication;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.collect.Lists;
23
24 import java.io.Closeable;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Set;
34
35 import org.apache.commons.lang.StringUtils;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.conf.Configuration;
39 import org.apache.hadoop.hbase.Abortable;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.TableNotFoundException;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.hbase.classification.InterfaceStability;
47 import org.apache.hadoop.hbase.client.Admin;
48 import org.apache.hadoop.hbase.client.Connection;
49 import org.apache.hadoop.hbase.client.ConnectionFactory;
50 import org.apache.hadoop.hbase.client.RegionLocator;
51 import org.apache.hadoop.hbase.replication.ReplicationException;
52 import org.apache.hadoop.hbase.replication.ReplicationFactory;
53 import org.apache.hadoop.hbase.replication.ReplicationPeer;
54 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
55 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
56 import org.apache.hadoop.hbase.replication.ReplicationPeers;
57 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
58 import org.apache.hadoop.hbase.util.Pair;
59 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @InterfaceAudience.Public
85 @InterfaceStability.Evolving
86 public class ReplicationAdmin implements Closeable {
87 private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
88
89 public static final String TNAME = "tableName";
90 public static final String CFNAME = "columnFamlyName";
91
92
93
94 public static final String REPLICATIONTYPE = "replicationType";
95 public static final String REPLICATIONGLOBAL = Integer
96 .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
97
98 private final Connection connection;
99
100
101 private final ReplicationQueuesClient replicationQueuesClient;
102 private final ReplicationPeers replicationPeers;
103
104
105
106
107 private final ZooKeeperWatcher zkw;
108
109
110
111
112
113
114
115 public ReplicationAdmin(Configuration conf) throws IOException {
116 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
117 HConstants.REPLICATION_ENABLE_DEFAULT)) {
118 throw new RuntimeException("hbase.replication isn't true, please " +
119 "enable it in order to use replication");
120 }
121 this.connection = ConnectionFactory.createConnection(conf);
122 try {
123 zkw = createZooKeeperWatcher();
124 try {
125 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
126 this.replicationPeers.init();
127 this.replicationQueuesClient =
128 ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
129 this.replicationQueuesClient.init();
130 } catch (Exception exception) {
131 if (zkw != null) {
132 zkw.close();
133 }
134 throw exception;
135 }
136 } catch (Exception exception) {
137 if (connection != null) {
138 connection.close();
139 }
140 if (exception instanceof IOException) {
141 throw (IOException) exception;
142 } else if (exception instanceof RuntimeException) {
143 throw (RuntimeException) exception;
144 } else {
145 throw new IOException("Error initializing the replication admin client.", exception);
146 }
147 }
148 }
149
150 private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
151
152 return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
153 @Override
154 public void abort(String why, Throwable e) {
155 LOG.error(why, e);
156
157
158 }
159
160 @Override
161 public boolean isAborted() {
162 return false;
163 }
164 });
165 }
166
167
168
169
170
171
172
173
174
175
176 @Deprecated
177 public void addPeer(String id, String clusterKey) throws ReplicationException {
178 this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
179 }
180
181 @Deprecated
182 public void addPeer(String id, String clusterKey, String tableCFs)
183 throws ReplicationException {
184 this.replicationPeers.addPeer(id,
185 new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
186 }
187
188
189
190
191
192
193
194
195
196
197 public void addPeer(String id, ReplicationPeerConfig peerConfig,
198 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
199 this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
200 }
201
202 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
203 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
204 return null;
205 }
206
207 Map<TableName, List<String>> tableCFsMap = null;
208
209
210
211 String[] tables = tableCFsConfig.split(";");
212 for (String tab : tables) {
213
214 tab = tab.trim();
215 if (tab.length() == 0) {
216 continue;
217 }
218
219
220 String[] pair = tab.split(":");
221 String tabName = pair[0].trim();
222 if (pair.length > 2 || tabName.length() == 0) {
223 LOG.error("ignore invalid tableCFs setting: " + tab);
224 continue;
225 }
226
227
228 List<String> cfs = null;
229 if (pair.length == 2) {
230 String[] cfsList = pair[1].split(",");
231 for (String cf : cfsList) {
232 String cfName = cf.trim();
233 if (cfName.length() > 0) {
234 if (cfs == null) {
235 cfs = new ArrayList<String>();
236 }
237 cfs.add(cfName);
238 }
239 }
240 }
241
242
243 if (tableCFsMap == null) {
244 tableCFsMap = new HashMap<TableName, List<String>>();
245 }
246 tableCFsMap.put(TableName.valueOf(tabName), cfs);
247 }
248 return tableCFsMap;
249 }
250
251 @VisibleForTesting
252 static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
253 String tableCfsStr = null;
254 if (tableCfs != null) {
255
256 StringBuilder builder = new StringBuilder();
257 for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
258 if (builder.length() > 0) {
259 builder.append(";");
260 }
261 builder.append(entry.getKey());
262 if (entry.getValue() != null && !entry.getValue().isEmpty()) {
263 builder.append(":");
264 builder.append(StringUtils.join(entry.getValue(), ","));
265 }
266 }
267 tableCfsStr = builder.toString();
268 }
269 return tableCfsStr;
270 }
271
272
273
274
275
276 public void removePeer(String id) throws ReplicationException {
277 this.replicationPeers.removePeer(id);
278 }
279
280
281
282
283
284 public void enablePeer(String id) throws ReplicationException {
285 this.replicationPeers.enablePeer(id);
286 }
287
288
289
290
291
292 public void disablePeer(String id) throws ReplicationException {
293 this.replicationPeers.disablePeer(id);
294 }
295
296
297
298
299
300 public int getPeersCount() {
301 return this.replicationPeers.getAllPeerIds().size();
302 }
303
304
305
306
307
308
309 @Deprecated
310 public Map<String, String> listPeers() {
311 Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
312 Map<String, String> ret = new HashMap<String, String>(peers.size());
313
314 for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
315 ret.put(entry.getKey(), entry.getValue().getClusterKey());
316 }
317 return ret;
318 }
319
320 public Map<String, ReplicationPeerConfig> listPeerConfigs() {
321 return this.replicationPeers.getAllPeerConfigs();
322 }
323
324 public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
325 return this.replicationPeers.getReplicationPeerConfig(id);
326 }
327
328
329
330
331
332 public String getPeerTableCFs(String id) throws ReplicationException {
333 return this.replicationPeers.getPeerTableCFsConfig(id);
334 }
335
336
337
338
339
340
341 @Deprecated
342 public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
343 this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
344 }
345
346
347
348
349
350
351 public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
352 appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
353 }
354
355
356
357
358
359
360 public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
361 throws ReplicationException {
362 if (tableCfs == null) {
363 throw new ReplicationException("tableCfs is null");
364 }
365 Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
366 if (preTableCfs == null) {
367 setPeerTableCFs(id, tableCfs);
368 return;
369 }
370
371 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
372 TableName table = entry.getKey();
373 Collection<String> appendCfs = entry.getValue();
374 if (preTableCfs.containsKey(table)) {
375 List<String> cfs = preTableCfs.get(table);
376 if (cfs == null || appendCfs == null) {
377 preTableCfs.put(table, null);
378 } else {
379 Set<String> cfSet = new HashSet<String>(cfs);
380 cfSet.addAll(appendCfs);
381 preTableCfs.put(table, Lists.newArrayList(cfSet));
382 }
383 } else {
384 if (appendCfs == null || appendCfs.isEmpty()) {
385 preTableCfs.put(table, null);
386 } else {
387 preTableCfs.put(table, Lists.newArrayList(appendCfs));
388 }
389 }
390 }
391 setPeerTableCFs(id, preTableCfs);
392 }
393
394
395
396
397
398
399
400 public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
401 removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
402 }
403
404
405
406
407
408
409
410 public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
411 throws ReplicationException {
412 if (tableCfs == null) {
413 throw new ReplicationException("tableCfs is null");
414 }
415
416 Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
417 if (preTableCfs == null) {
418 throw new ReplicationException("Table-Cfs for peer" + id + " is null");
419 }
420 for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
421 TableName table = entry.getKey();
422 Collection<String> removeCfs = entry.getValue();
423 if (preTableCfs.containsKey(table)) {
424 List<String> cfs = preTableCfs.get(table);
425 if (cfs == null && removeCfs == null) {
426 preTableCfs.remove(table);
427 } else if (cfs != null && removeCfs != null) {
428 Set<String> cfSet = new HashSet<String>(cfs);
429 cfSet.removeAll(removeCfs);
430 if (cfSet.isEmpty()) {
431 preTableCfs.remove(table);
432 } else {
433 preTableCfs.put(table, Lists.newArrayList(cfSet));
434 }
435 } else if (cfs == null && removeCfs != null) {
436 throw new ReplicationException("Cannot remove cf of table: " + table
437 + " which doesn't specify cfs from table-cfs config in peer: " + id);
438 } else if (cfs != null && removeCfs == null) {
439 throw new ReplicationException("Cannot remove table: " + table
440 + " which has specified cfs from table-cfs config in peer: " + id);
441 }
442 } else {
443 throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
444 }
445 }
446 setPeerTableCFs(id, preTableCfs);
447 }
448
449
450
451
452
453
454
455
456
457 public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
458 throws ReplicationException {
459 this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
460 }
461
462
463
464
465
466
467
468 public boolean getPeerState(String id) throws ReplicationException {
469 return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
470 }
471
472 @Override
473 public void close() throws IOException {
474 if (this.zkw != null) {
475 this.zkw.close();
476 }
477 if (this.connection != null) {
478 this.connection.close();
479 }
480 }
481
482
483
484
485
486
487
488
489
490
491
492
493
494 public List<HashMap<String, String>> listReplicated() throws IOException {
495 List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
496
497 Admin admin = connection.getAdmin();
498 HTableDescriptor[] tables;
499 try {
500 tables = admin.listTables();
501 } finally {
502 if (admin!= null) admin.close();
503 }
504
505 for (HTableDescriptor table : tables) {
506 HColumnDescriptor[] columns = table.getColumnFamilies();
507 String tableName = table.getNameAsString();
508 for (HColumnDescriptor column : columns) {
509 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
510
511 HashMap<String, String> replicationEntry = new HashMap<String, String>();
512 replicationEntry.put(TNAME, tableName);
513 replicationEntry.put(CFNAME, column.getNameAsString());
514 replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
515 replicationColFams.add(replicationEntry);
516 }
517 }
518 }
519
520 return replicationColFams;
521 }
522
523
524
525
526
527
528 public void enableTableRep(final TableName tableName) throws IOException {
529 if (tableName == null) {
530 throw new IllegalArgumentException("Table name cannot be null");
531 }
532 try (Admin admin = this.connection.getAdmin()) {
533 if (!admin.tableExists(tableName)) {
534 throw new TableNotFoundException("Table '" + tableName.getNameAsString()
535 + "' does not exists.");
536 }
537 }
538 byte[][] splits = getTableSplitRowKeys(tableName);
539 checkAndSyncTableDescToPeers(tableName, splits);
540 setTableRep(tableName, true);
541 }
542
543
544
545
546
547
548 public void disableTableRep(final TableName tableName) throws IOException {
549 if (tableName == null) {
550 throw new IllegalArgumentException("Table name is null");
551 }
552 try (Admin admin = this.connection.getAdmin()) {
553 if (!admin.tableExists(tableName)) {
554 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
555 + "' does not exists.");
556 }
557 }
558 setTableRep(tableName, false);
559 }
560
561
562
563
564
565
566
567 private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
568 try (RegionLocator locator = connection.getRegionLocator(tableName);) {
569 byte[][] startKeys = locator.getStartKeys();
570 if (startKeys.length == 1) {
571 return null;
572 }
573 byte[][] splits = new byte[startKeys.length - 1][];
574 for (int i = 1; i < startKeys.length; i++) {
575 splits[i - 1] = startKeys[i];
576 }
577 return splits;
578 }
579 }
580
581
582
583
584
585
586
587
588
589
590
591 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
592 throws IOException {
593 List<ReplicationPeer> repPeers = listReplicationPeers();
594 if (repPeers == null || repPeers.size() <= 0) {
595 throw new IllegalArgumentException("Found no peer cluster for replication.");
596 }
597
598 final TableName onlyTableNameQualifier = TableName.valueOf(tableName.getQualifierAsString());
599
600 for (ReplicationPeer repPeer : repPeers) {
601 Map<TableName, List<String>> tableCFMap = repPeer.getTableCFs();
602
603
604 if (tableCFMap != null && !tableCFMap.containsKey(onlyTableNameQualifier)) {
605 continue;
606 }
607
608 Configuration peerConf = repPeer.getConfiguration();
609 HTableDescriptor htd = null;
610 try (Connection conn = ConnectionFactory.createConnection(peerConf);
611 Admin admin = this.connection.getAdmin();
612 Admin repHBaseAdmin = conn.getAdmin()) {
613 htd = admin.getTableDescriptor(tableName);
614 HTableDescriptor peerHtd = null;
615 if (!repHBaseAdmin.tableExists(tableName)) {
616 repHBaseAdmin.createTable(htd, splits);
617 } else {
618 peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
619 if (peerHtd == null) {
620 throw new IllegalArgumentException("Failed to get table descriptor for table "
621 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
622 } else if (!peerHtd.equals(htd)) {
623 throw new IllegalArgumentException("Table " + tableName.getNameAsString()
624 + " exists in peer cluster " + repPeer.getId()
625 + ", but the table descriptors are not same when compared with source cluster."
626 + " Thus can not enable the table's replication switch.");
627 }
628 }
629 }
630 }
631 }
632
633 @VisibleForTesting
634 List<ReplicationPeer> listReplicationPeers() {
635 Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
636 if (peers == null || peers.size() <= 0) {
637 return null;
638 }
639 List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
640 for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
641 String peerId = peerEntry.getKey();
642 try {
643 Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
644 Configuration peerConf = pair.getSecond();
645 ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
646 parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
647 listOfPeers.add(peer);
648 } catch (ReplicationException e) {
649 LOG.warn("Failed to get valid replication peers. "
650 + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
651 + e.getMessage());
652 LOG.debug("Failure details to get valid replication peers.", e);
653 continue;
654 }
655 }
656 return listOfPeers;
657 }
658
659
660
661
662
663
664
665 private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
666 Admin admin = null;
667 try {
668 admin = this.connection.getAdmin();
669 HTableDescriptor htd = admin.getTableDescriptor(tableName);
670 if (isTableRepEnabled(htd) ^ isRepEnabled) {
671 boolean isOnlineSchemaUpdateEnabled =
672 this.connection.getConfiguration()
673 .getBoolean("hbase.online.schema.update.enable", true);
674 if (!isOnlineSchemaUpdateEnabled) {
675 admin.disableTable(tableName);
676 }
677 for (HColumnDescriptor hcd : htd.getFamilies()) {
678 hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
679 : HConstants.REPLICATION_SCOPE_LOCAL);
680 }
681 admin.modifyTable(tableName, htd);
682 if (!isOnlineSchemaUpdateEnabled) {
683 admin.enableTable(tableName);
684 }
685 }
686 } finally {
687 if (admin != null) {
688 try {
689 admin.close();
690 } catch (IOException e) {
691 LOG.warn("Failed to close admin connection.");
692 LOG.debug("Details on failure to close admin connection.", e);
693 }
694 }
695 }
696 }
697
698
699
700
701
702 private boolean isTableRepEnabled(HTableDescriptor htd) {
703 for (HColumnDescriptor hcd : htd.getFamilies()) {
704 if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
705 return false;
706 }
707 }
708 return true;
709 }
710 }