001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.regex.Pattern; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 045import org.apache.hadoop.hbase.replication.ReplicationException; 046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 047import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 051import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; 052import org.apache.hadoop.hbase.testclassification.ClientTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.junit.After; 055import org.junit.AfterClass; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Unit testing of ReplicationAdmin 067 */ 068@Category({ MediumTests.class, ClientTests.class }) 069public class TestReplicationAdmin { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestReplicationAdmin.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); 076 077 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 078 079 private final String ID_ONE = "1"; 080 private static String KEY_ONE; 081 private final String ID_SECOND = "2"; 082 private static String KEY_SECOND; 083 084 private static ReplicationAdmin admin; 085 private static Admin hbaseAdmin; 086 087 @Rule 088 public TestName name = new TestName(); 089 090 /** 091 * @throws java.lang.Exception 092 */ 093 @BeforeClass 094 public static void setUpBeforeClass() throws Exception { 095 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 096 TEST_UTIL.startMiniCluster(); 097 admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); 098 hbaseAdmin = TEST_UTIL.getAdmin(); 099 KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; 100 KEY_SECOND = TEST_UTIL.getClusterKey() + "-test2"; 101 } 102 103 @AfterClass 104 public static void tearDownAfterClass() throws Exception { 105 if (admin != null) { 106 admin.close(); 107 } 108 TEST_UTIL.shutdownMiniCluster(); 109 } 110 111 @After 112 public void tearDown() throws Exception { 113 for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) { 114 hbaseAdmin.removeReplicationPeer(desc.getPeerId()); 115 } 116 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 117 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 118 for (ServerName serverName : queueStorage.getListOfReplicators()) { 119 for (String queue : queueStorage.getAllQueues(serverName)) { 120 queueStorage.removeQueue(serverName, queue); 121 } 122 queueStorage.removeReplicatorIfQueueIsEmpty(serverName); 123 } 124 } 125 126 @Test 127 public void testConcurrentPeerOperations() throws Exception { 128 int threadNum = 5; 129 AtomicLong successCount = new AtomicLong(0); 130 131 // Test concurrent add peer operation 132 Thread[] addPeers = new Thread[threadNum]; 133 for (int i = 0; i < threadNum; i++) { 134 addPeers[i] = new Thread(() -> { 135 try { 136 hbaseAdmin.addReplicationPeer(ID_ONE, 137 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); 138 successCount.incrementAndGet(); 139 } catch (Exception e) { 140 LOG.debug("Got exception when add replication peer", e); 141 } 142 }); 143 addPeers[i].start(); 144 } 145 for (Thread addPeer : addPeers) { 146 addPeer.join(); 147 } 148 assertEquals(1, successCount.get()); 149 150 // Test concurrent remove peer operation 151 successCount.set(0); 152 Thread[] removePeers = new Thread[threadNum]; 153 for (int i = 0; i < threadNum; i++) { 154 removePeers[i] = new Thread(() -> { 155 try { 156 hbaseAdmin.removeReplicationPeer(ID_ONE); 157 successCount.incrementAndGet(); 158 } catch (Exception e) { 159 LOG.debug("Got exception when remove replication peer", e); 160 } 161 }); 162 removePeers[i].start(); 163 } 164 for (Thread removePeer : removePeers) { 165 removePeer.join(); 166 } 167 assertEquals(1, successCount.get()); 168 169 // Test concurrent add peer operation again 170 successCount.set(0); 171 addPeers = new Thread[threadNum]; 172 for (int i = 0; i < threadNum; i++) { 173 addPeers[i] = new Thread(() -> { 174 try { 175 hbaseAdmin.addReplicationPeer(ID_ONE, 176 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); 177 successCount.incrementAndGet(); 178 } catch (Exception e) { 179 LOG.debug("Got exception when add replication peer", e); 180 } 181 }); 182 addPeers[i].start(); 183 } 184 for (Thread addPeer : addPeers) { 185 addPeer.join(); 186 } 187 assertEquals(1, successCount.get()); 188 } 189 190 @Test 191 public void testAddInvalidPeer() { 192 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 193 builder.setClusterKey(KEY_ONE); 194 try { 195 String invalidPeerId = "1-2"; 196 hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build()); 197 fail("Should fail as the peer id: " + invalidPeerId + " is invalid"); 198 } catch (Exception e) { 199 // OK 200 } 201 202 try { 203 String invalidClusterKey = "2181:/hbase"; 204 builder.setClusterKey(invalidClusterKey); 205 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 206 fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid"); 207 } catch (Exception e) { 208 // OK 209 } 210 } 211 212 /** 213 * Simple testing of adding and removing peers, basically shows that all interactions with ZK work 214 * n 215 */ 216 @Test 217 public void testAddRemovePeer() throws Exception { 218 ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder(); 219 rpc1.setClusterKey(KEY_ONE); 220 ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder(); 221 rpc2.setClusterKey(KEY_SECOND); 222 // Add a valid peer 223 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 224 // try adding the same (fails) 225 try { 226 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 227 } catch (Exception e) { 228 // OK! 229 } 230 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 231 // Try to remove an inexisting peer 232 try { 233 hbaseAdmin.removeReplicationPeer(ID_SECOND); 234 fail(); 235 } catch (Exception e) { 236 // OK! 237 } 238 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 239 // Add a second since multi-slave is supported 240 try { 241 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build()); 242 } catch (Exception e) { 243 fail(); 244 } 245 assertEquals(2, hbaseAdmin.listReplicationPeers().size()); 246 // Remove the first peer we added 247 hbaseAdmin.removeReplicationPeer(ID_ONE); 248 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 249 hbaseAdmin.removeReplicationPeer(ID_SECOND); 250 assertEquals(0, hbaseAdmin.listReplicationPeers().size()); 251 } 252 253 @Test 254 public void testAddPeerWithState() throws Exception { 255 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 256 rpc1.setClusterKey(KEY_ONE); 257 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true); 258 assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled()); 259 hbaseAdmin.removeReplicationPeer(ID_ONE); 260 261 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 262 rpc2.setClusterKey(KEY_SECOND); 263 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false); 264 assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled()); 265 hbaseAdmin.removeReplicationPeer(ID_SECOND); 266 } 267 268 /** 269 * Tests that the peer configuration used by ReplicationAdmin contains all the peer's properties. 270 */ 271 @Test 272 public void testPeerConfig() throws Exception { 273 ReplicationPeerConfig config = new ReplicationPeerConfig(); 274 config.setClusterKey(KEY_ONE); 275 config.getConfiguration().put("key1", "value1"); 276 config.getConfiguration().put("key2", "value2"); 277 hbaseAdmin.addReplicationPeer(ID_ONE, config); 278 279 List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers(); 280 assertEquals(1, peers.size()); 281 ReplicationPeerDescription peerOne = peers.get(0); 282 assertNotNull(peerOne); 283 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 284 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 285 286 hbaseAdmin.removeReplicationPeer(ID_ONE); 287 } 288 289 @Test 290 public void testAddPeerWithUnDeletedQueues() throws Exception { 291 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 292 rpc1.setClusterKey(KEY_ONE); 293 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 294 rpc2.setClusterKey(KEY_SECOND); 295 Configuration conf = TEST_UTIL.getConfiguration(); 296 ReplicationQueueStorage queueStorage = 297 ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf); 298 299 ServerName serverName = ServerName.valueOf("server1", 8000, 1234); 300 // add queue for ID_ONE 301 queueStorage.addWAL(serverName, ID_ONE, "file1"); 302 try { 303 admin.addPeer(ID_ONE, rpc1, null); 304 fail(); 305 } catch (Exception e) { 306 // OK! 307 } 308 queueStorage.removeQueue(serverName, ID_ONE); 309 assertEquals(0, queueStorage.getAllQueues(serverName).size()); 310 311 // add recovered queue for ID_ONE 312 queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1"); 313 try { 314 admin.addPeer(ID_ONE, rpc2, null); 315 fail(); 316 } catch (Exception e) { 317 // OK! 318 } 319 } 320 321 /** 322 * basic checks that when we add a peer that it is enabled, and that we can disable n 323 */ 324 @Test 325 public void testEnableDisable() throws Exception { 326 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 327 rpc1.setClusterKey(KEY_ONE); 328 admin.addPeer(ID_ONE, rpc1, null); 329 assertEquals(1, admin.getPeersCount()); 330 assertTrue(admin.getPeerState(ID_ONE)); 331 admin.disablePeer(ID_ONE); 332 333 assertFalse(admin.getPeerState(ID_ONE)); 334 try { 335 admin.getPeerState(ID_SECOND); 336 } catch (ReplicationPeerNotFoundException e) { 337 // OK! 338 } 339 admin.removePeer(ID_ONE); 340 } 341 342 @Test 343 public void testAppendPeerTableCFs() throws Exception { 344 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 345 rpc.setClusterKey(KEY_ONE); 346 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 347 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 348 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 349 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 350 final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); 351 final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); 352 353 // Add a valid peer 354 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 355 356 // Update peer config, not replicate all user tables 357 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 358 rpc.setReplicateAllUserTables(false); 359 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 360 361 Map<TableName, List<String>> tableCFs = new HashMap<>(); 362 tableCFs.put(tableName1, null); 363 admin.appendPeerTableCFs(ID_ONE, tableCFs); 364 Map<TableName, List<String>> result = 365 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 366 assertEquals(1, result.size()); 367 assertEquals(true, result.containsKey(tableName1)); 368 assertNull(result.get(tableName1)); 369 370 // append table t2 to replication 371 tableCFs.clear(); 372 tableCFs.put(tableName2, null); 373 admin.appendPeerTableCFs(ID_ONE, tableCFs); 374 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 375 assertEquals(2, result.size()); 376 assertTrue("Should contain t1", result.containsKey(tableName1)); 377 assertTrue("Should contain t2", result.containsKey(tableName2)); 378 assertNull(result.get(tableName1)); 379 assertNull(result.get(tableName2)); 380 381 // append table column family: f1 of t3 to replication 382 tableCFs.clear(); 383 tableCFs.put(tableName3, new ArrayList<>()); 384 tableCFs.get(tableName3).add("f1"); 385 admin.appendPeerTableCFs(ID_ONE, tableCFs); 386 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 387 assertEquals(3, result.size()); 388 assertTrue("Should contain t1", result.containsKey(tableName1)); 389 assertTrue("Should contain t2", result.containsKey(tableName2)); 390 assertTrue("Should contain t3", result.containsKey(tableName3)); 391 assertNull(result.get(tableName1)); 392 assertNull(result.get(tableName2)); 393 assertEquals(1, result.get(tableName3).size()); 394 assertEquals("f1", result.get(tableName3).get(0)); 395 396 tableCFs.clear(); 397 tableCFs.put(tableName4, new ArrayList<>()); 398 tableCFs.get(tableName4).add("f1"); 399 tableCFs.get(tableName4).add("f2"); 400 admin.appendPeerTableCFs(ID_ONE, tableCFs); 401 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 402 assertEquals(4, result.size()); 403 assertTrue("Should contain t1", result.containsKey(tableName1)); 404 assertTrue("Should contain t2", result.containsKey(tableName2)); 405 assertTrue("Should contain t3", result.containsKey(tableName3)); 406 assertTrue("Should contain t4", result.containsKey(tableName4)); 407 assertNull(result.get(tableName1)); 408 assertNull(result.get(tableName2)); 409 assertEquals(1, result.get(tableName3).size()); 410 assertEquals("f1", result.get(tableName3).get(0)); 411 assertEquals(2, result.get(tableName4).size()); 412 assertEquals("f1", result.get(tableName4).get(0)); 413 assertEquals("f2", result.get(tableName4).get(1)); 414 415 // append "table5" => [], then append "table5" => ["f1"] 416 tableCFs.clear(); 417 tableCFs.put(tableName5, new ArrayList<>()); 418 admin.appendPeerTableCFs(ID_ONE, tableCFs); 419 tableCFs.clear(); 420 tableCFs.put(tableName5, new ArrayList<>()); 421 tableCFs.get(tableName5).add("f1"); 422 admin.appendPeerTableCFs(ID_ONE, tableCFs); 423 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 424 assertEquals(5, result.size()); 425 assertTrue("Should contain t5", result.containsKey(tableName5)); 426 // null means replication all cfs of tab5 427 assertNull(result.get(tableName5)); 428 429 // append "table6" => ["f1"], then append "table6" => [] 430 tableCFs.clear(); 431 tableCFs.put(tableName6, new ArrayList<>()); 432 tableCFs.get(tableName6).add("f1"); 433 admin.appendPeerTableCFs(ID_ONE, tableCFs); 434 tableCFs.clear(); 435 tableCFs.put(tableName6, new ArrayList<>()); 436 admin.appendPeerTableCFs(ID_ONE, tableCFs); 437 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 438 assertEquals(6, result.size()); 439 assertTrue("Should contain t6", result.containsKey(tableName6)); 440 // null means replication all cfs of tab6 441 assertNull(result.get(tableName6)); 442 443 admin.removePeer(ID_ONE); 444 } 445 446 @Test 447 public void testRemovePeerTableCFs() throws Exception { 448 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 449 rpc.setClusterKey(KEY_ONE); 450 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 451 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 452 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 453 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 454 455 // Add a valid peer 456 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 457 458 // Update peer config, not replicate all user tables 459 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 460 rpc.setReplicateAllUserTables(false); 461 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 462 463 Map<TableName, List<String>> tableCFs = new HashMap<>(); 464 try { 465 tableCFs.put(tableName3, null); 466 admin.removePeerTableCFs(ID_ONE, tableCFs); 467 assertTrue(false); 468 } catch (ReplicationException e) { 469 } 470 assertNull(admin.getPeerTableCFs(ID_ONE)); 471 472 tableCFs.clear(); 473 tableCFs.put(tableName1, null); 474 tableCFs.put(tableName2, new ArrayList<>()); 475 tableCFs.get(tableName2).add("cf1"); 476 admin.setPeerTableCFs(ID_ONE, tableCFs); 477 try { 478 tableCFs.clear(); 479 tableCFs.put(tableName3, null); 480 admin.removePeerTableCFs(ID_ONE, tableCFs); 481 assertTrue(false); 482 } catch (ReplicationException e) { 483 } 484 Map<TableName, List<String>> result = 485 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 486 assertEquals(2, result.size()); 487 assertTrue("Should contain t1", result.containsKey(tableName1)); 488 assertTrue("Should contain t2", result.containsKey(tableName2)); 489 assertNull(result.get(tableName1)); 490 assertEquals(1, result.get(tableName2).size()); 491 assertEquals("cf1", result.get(tableName2).get(0)); 492 493 try { 494 tableCFs.clear(); 495 tableCFs.put(tableName1, new ArrayList<>()); 496 tableCFs.get(tableName1).add("f1"); 497 admin.removePeerTableCFs(ID_ONE, tableCFs); 498 assertTrue(false); 499 } catch (ReplicationException e) { 500 } 501 tableCFs.clear(); 502 tableCFs.put(tableName1, null); 503 admin.removePeerTableCFs(ID_ONE, tableCFs); 504 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 505 assertEquals(1, result.size()); 506 assertEquals(1, result.get(tableName2).size()); 507 assertEquals("cf1", result.get(tableName2).get(0)); 508 509 try { 510 tableCFs.clear(); 511 tableCFs.put(tableName2, null); 512 admin.removePeerTableCFs(ID_ONE, tableCFs); 513 fail(); 514 } catch (ReplicationException e) { 515 } 516 tableCFs.clear(); 517 tableCFs.put(tableName2, new ArrayList<>()); 518 tableCFs.get(tableName2).add("cf1"); 519 admin.removePeerTableCFs(ID_ONE, tableCFs); 520 assertNull(admin.getPeerTableCFs(ID_ONE)); 521 522 tableCFs.clear(); 523 tableCFs.put(tableName4, new ArrayList<>()); 524 admin.setPeerTableCFs(ID_ONE, tableCFs); 525 admin.removePeerTableCFs(ID_ONE, tableCFs); 526 assertNull(admin.getPeerTableCFs(ID_ONE)); 527 528 admin.removePeer(ID_ONE); 529 } 530 531 @Test 532 public void testSetPeerNamespaces() throws Exception { 533 String ns1 = "ns1"; 534 String ns2 = "ns2"; 535 536 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 537 rpc.setClusterKey(KEY_ONE); 538 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 539 540 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 541 rpc.setReplicateAllUserTables(false); 542 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 543 544 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 545 Set<String> namespaces = new HashSet<>(); 546 namespaces.add(ns1); 547 namespaces.add(ns2); 548 rpc.setNamespaces(namespaces); 549 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 550 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 551 assertEquals(2, namespaces.size()); 552 assertTrue(namespaces.contains(ns1)); 553 assertTrue(namespaces.contains(ns2)); 554 555 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 556 namespaces = new HashSet<>(); 557 namespaces.add(ns1); 558 rpc.setNamespaces(namespaces); 559 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 560 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 561 assertEquals(1, namespaces.size()); 562 assertTrue(namespaces.contains(ns1)); 563 564 hbaseAdmin.removeReplicationPeer(ID_ONE); 565 } 566 567 @Test 568 public void testSetReplicateAllUserTables() throws Exception { 569 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 570 rpc.setClusterKey(KEY_ONE); 571 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 572 573 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 574 assertTrue(rpc.replicateAllUserTables()); 575 576 rpc.setReplicateAllUserTables(false); 577 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 578 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 579 assertFalse(rpc.replicateAllUserTables()); 580 581 rpc.setReplicateAllUserTables(true); 582 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 583 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 584 assertTrue(rpc.replicateAllUserTables()); 585 586 hbaseAdmin.removeReplicationPeer(ID_ONE); 587 } 588 589 @Test 590 public void testPeerExcludeNamespaces() throws Exception { 591 String ns1 = "ns1"; 592 String ns2 = "ns2"; 593 594 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 595 rpc.setClusterKey(KEY_ONE); 596 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 597 598 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 599 assertTrue(rpc.replicateAllUserTables()); 600 601 Set<String> namespaces = new HashSet<String>(); 602 namespaces.add(ns1); 603 namespaces.add(ns2); 604 rpc.setExcludeNamespaces(namespaces); 605 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 606 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 607 assertEquals(2, namespaces.size()); 608 assertTrue(namespaces.contains(ns1)); 609 assertTrue(namespaces.contains(ns2)); 610 611 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 612 namespaces = new HashSet<String>(); 613 namespaces.add(ns1); 614 rpc.setExcludeNamespaces(namespaces); 615 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 616 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 617 assertEquals(1, namespaces.size()); 618 assertTrue(namespaces.contains(ns1)); 619 620 hbaseAdmin.removeReplicationPeer(ID_ONE); 621 } 622 623 @Test 624 public void testPeerExcludeTableCFs() throws Exception { 625 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 626 rpc.setClusterKey(KEY_ONE); 627 TableName tab1 = TableName.valueOf("t1"); 628 TableName tab2 = TableName.valueOf("t2"); 629 TableName tab3 = TableName.valueOf("t3"); 630 TableName tab4 = TableName.valueOf("t4"); 631 632 // Add a valid peer 633 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 634 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 635 assertTrue(rpc.replicateAllUserTables()); 636 637 Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>(); 638 tableCFs.put(tab1, null); 639 rpc.setExcludeTableCFsMap(tableCFs); 640 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 641 Map<TableName, List<String>> result = 642 hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 643 assertEquals(1, result.size()); 644 assertEquals(true, result.containsKey(tab1)); 645 assertNull(result.get(tab1)); 646 647 tableCFs.put(tab2, new ArrayList<String>()); 648 tableCFs.get(tab2).add("f1"); 649 rpc.setExcludeTableCFsMap(tableCFs); 650 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 651 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 652 assertEquals(2, result.size()); 653 assertTrue("Should contain t1", result.containsKey(tab1)); 654 assertTrue("Should contain t2", result.containsKey(tab2)); 655 assertNull(result.get(tab1)); 656 assertEquals(1, result.get(tab2).size()); 657 assertEquals("f1", result.get(tab2).get(0)); 658 659 tableCFs.clear(); 660 tableCFs.put(tab3, new ArrayList<String>()); 661 tableCFs.put(tab4, new ArrayList<String>()); 662 tableCFs.get(tab4).add("f1"); 663 tableCFs.get(tab4).add("f2"); 664 rpc.setExcludeTableCFsMap(tableCFs); 665 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 666 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 667 assertEquals(2, result.size()); 668 assertTrue("Should contain t3", result.containsKey(tab3)); 669 assertTrue("Should contain t4", result.containsKey(tab4)); 670 assertNull(result.get(tab3)); 671 assertEquals(2, result.get(tab4).size()); 672 assertEquals("f1", result.get(tab4).get(0)); 673 assertEquals("f2", result.get(tab4).get(1)); 674 675 hbaseAdmin.removeReplicationPeer(ID_ONE); 676 } 677 678 @Test 679 public void testPeerConfigConflict() throws Exception { 680 // Default replicate_all flag is true 681 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 682 rpc.setClusterKey(KEY_ONE); 683 684 String ns1 = "ns1"; 685 Set<String> namespaces = new HashSet<String>(); 686 namespaces.add(ns1); 687 688 TableName tab1 = TableName.valueOf("ns2:tabl"); 689 Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); 690 tableCfs.put(tab1, new ArrayList<String>()); 691 692 try { 693 rpc.setNamespaces(namespaces); 694 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 695 fail("Should throw Exception." 696 + " When replicate all flag is true, no need to config namespaces"); 697 } catch (IOException e) { 698 // OK 699 rpc.setNamespaces(null); 700 } 701 702 try { 703 rpc.setTableCFsMap(tableCfs); 704 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 705 fail("Should throw Exception." 706 + " When replicate all flag is true, no need to config table-cfs"); 707 } catch (IOException e) { 708 // OK 709 rpc.setTableCFsMap(null); 710 } 711 712 // Set replicate_all flag to true 713 rpc.setReplicateAllUserTables(false); 714 try { 715 rpc.setExcludeNamespaces(namespaces); 716 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 717 fail("Should throw Exception." 718 + " When replicate all flag is false, no need to config exclude namespaces"); 719 } catch (IOException e) { 720 // OK 721 rpc.setExcludeNamespaces(null); 722 } 723 724 try { 725 rpc.setExcludeTableCFsMap(tableCfs); 726 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 727 fail("Should throw Exception." 728 + " When replicate all flag is false, no need to config exclude table-cfs"); 729 } catch (IOException e) { 730 // OK 731 rpc.setExcludeTableCFsMap(null); 732 } 733 734 rpc.setNamespaces(namespaces); 735 rpc.setTableCFsMap(tableCfs); 736 // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config 737 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 738 739 // Default replicate_all flag is true 740 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 741 rpc2.setClusterKey(KEY_SECOND); 742 rpc2.setExcludeNamespaces(namespaces); 743 rpc2.setExcludeTableCFsMap(tableCfs); 744 // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude 745 // table-cfs config 746 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 747 748 hbaseAdmin.removeReplicationPeer(ID_ONE); 749 hbaseAdmin.removeReplicationPeer(ID_SECOND); 750 } 751 752 @Test 753 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 754 String ns1 = "ns1"; 755 String ns2 = "ns2"; 756 final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); 757 final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); 758 759 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 760 rpc.setClusterKey(KEY_ONE); 761 rpc.setReplicateAllUserTables(false); 762 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 763 764 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 765 Set<String> namespaces = new HashSet<String>(); 766 namespaces.add(ns1); 767 rpc.setNamespaces(namespaces); 768 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 769 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 770 try { 771 Map<TableName, List<String>> tableCfs = new HashMap<>(); 772 tableCfs.put(tableName1, new ArrayList<>()); 773 rpc.setTableCFsMap(tableCfs); 774 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 775 fail("Should throw ReplicationException" + " Because table " + tableName1 776 + " conflict with namespace " + ns1); 777 } catch (Exception e) { 778 // OK 779 } 780 781 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 782 Map<TableName, List<String>> tableCfs = new HashMap<>(); 783 tableCfs.put(tableName2, new ArrayList<>()); 784 rpc.setTableCFsMap(tableCfs); 785 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 786 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 787 try { 788 namespaces.clear(); 789 namespaces.add(ns2); 790 rpc.setNamespaces(namespaces); 791 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 792 fail("Should throw ReplicationException" + " Because namespace " + ns2 793 + " conflict with table " + tableName2); 794 } catch (Exception e) { 795 // OK 796 } 797 798 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 799 rpc2.setClusterKey(KEY_SECOND); 800 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 801 802 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 803 Set<String> excludeNamespaces = new HashSet<String>(); 804 excludeNamespaces.add(ns1); 805 rpc2.setExcludeNamespaces(excludeNamespaces); 806 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 807 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 808 try { 809 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 810 excludeTableCfs.put(tableName1, new ArrayList<>()); 811 rpc2.setExcludeTableCFsMap(excludeTableCfs); 812 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 813 fail("Should throw ReplicationException" + " Because exclude table " + tableName1 814 + " conflict with exclude namespace " + ns1); 815 } catch (Exception e) { 816 // OK 817 } 818 819 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 820 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 821 excludeTableCfs.put(tableName2, new ArrayList<>()); 822 rpc2.setExcludeTableCFsMap(excludeTableCfs); 823 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 824 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 825 try { 826 namespaces.clear(); 827 namespaces.add(ns2); 828 rpc2.setNamespaces(namespaces); 829 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 830 fail("Should throw ReplicationException" + " Because exclude namespace " + ns2 831 + " conflict with exclude table " + tableName2); 832 } catch (Exception e) { 833 // OK 834 } 835 836 hbaseAdmin.removeReplicationPeer(ID_ONE); 837 hbaseAdmin.removeReplicationPeer(ID_SECOND); 838 } 839 840 @Test 841 public void testPeerBandwidth() throws Exception { 842 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 843 rpc.setClusterKey(KEY_ONE); 844 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 845 846 rpc = admin.getPeerConfig(ID_ONE); 847 assertEquals(0, rpc.getBandwidth()); 848 849 rpc.setBandwidth(2097152); 850 admin.updatePeerConfig(ID_ONE, rpc); 851 852 assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); 853 admin.removePeer(ID_ONE); 854 } 855 856 @Test 857 public void testPeerClusterKey() throws Exception { 858 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 859 builder.setClusterKey(KEY_ONE); 860 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 861 862 try { 863 builder.setClusterKey(KEY_SECOND); 864 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 865 fail("Change cluster key on an existing peer is not allowed"); 866 } catch (Exception e) { 867 // OK 868 } 869 } 870 871 @Test 872 public void testPeerReplicationEndpointImpl() throws Exception { 873 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 874 builder.setClusterKey(KEY_ONE); 875 builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); 876 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 877 878 try { 879 builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()); 880 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 881 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 882 } catch (Exception e) { 883 // OK 884 } 885 886 try { 887 builder = ReplicationPeerConfig.newBuilder(); 888 builder.setClusterKey(KEY_ONE); 889 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 890 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 891 } catch (Exception e) { 892 // OK 893 } 894 895 builder = ReplicationPeerConfig.newBuilder(); 896 builder.setClusterKey(KEY_SECOND); 897 hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); 898 899 try { 900 builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); 901 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); 902 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 903 } catch (Exception e) { 904 // OK 905 } 906 } 907}