001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.regex.Pattern; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 045import org.apache.hadoop.hbase.replication.ReplicationException; 046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 047import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 051import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; 052import org.apache.hadoop.hbase.testclassification.ClientTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.junit.After; 055import org.junit.AfterClass; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Unit testing of ReplicationAdmin 067 */ 068@Category({MediumTests.class, ClientTests.class}) 069public class TestReplicationAdmin { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestReplicationAdmin.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); 076 077 private final static HBaseTestingUtility TEST_UTIL = 078 new HBaseTestingUtility(); 079 080 private final String ID_ONE = "1"; 081 private static String KEY_ONE; 082 private final String ID_SECOND = "2"; 083 private static String KEY_SECOND; 084 085 private static ReplicationAdmin admin; 086 private static Admin hbaseAdmin; 087 088 @Rule 089 public TestName name = new TestName(); 090 091 /** 092 * @throws java.lang.Exception 093 */ 094 @BeforeClass 095 public static void setUpBeforeClass() throws Exception { 096 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 097 TEST_UTIL.startMiniCluster(); 098 admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); 099 hbaseAdmin = TEST_UTIL.getAdmin(); 100 KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; 101 KEY_SECOND = TEST_UTIL.getClusterKey() + "-test2"; 102 } 103 104 @AfterClass 105 public static void tearDownAfterClass() throws Exception { 106 if (admin != null) { 107 admin.close(); 108 } 109 TEST_UTIL.shutdownMiniCluster(); 110 } 111 112 @After 113 public void tearDown() throws Exception { 114 for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) { 115 hbaseAdmin.removeReplicationPeer(desc.getPeerId()); 116 } 117 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 118 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 119 for (ServerName serverName : queueStorage.getListOfReplicators()) { 120 for (String queue : queueStorage.getAllQueues(serverName)) { 121 queueStorage.removeQueue(serverName, queue); 122 } 123 queueStorage.removeReplicatorIfQueueIsEmpty(serverName); 124 } 125 } 126 127 @Test 128 public void testConcurrentPeerOperations() throws Exception { 129 int threadNum = 5; 130 AtomicLong successCount = new AtomicLong(0); 131 132 // Test concurrent add peer operation 133 Thread[] addPeers = new Thread[threadNum]; 134 for (int i = 0; i < threadNum; i++) { 135 addPeers[i] = new Thread(() -> { 136 try { 137 hbaseAdmin.addReplicationPeer(ID_ONE, 138 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); 139 successCount.incrementAndGet(); 140 } catch (Exception e) { 141 LOG.debug("Got exception when add replication peer", e); 142 } 143 }); 144 addPeers[i].start(); 145 } 146 for (Thread addPeer : addPeers) { 147 addPeer.join(); 148 } 149 assertEquals(1, successCount.get()); 150 151 // Test concurrent remove peer operation 152 successCount.set(0); 153 Thread[] removePeers = new Thread[threadNum]; 154 for (int i = 0; i < threadNum; i++) { 155 removePeers[i] = new Thread(() -> { 156 try { 157 hbaseAdmin.removeReplicationPeer(ID_ONE); 158 successCount.incrementAndGet(); 159 } catch (Exception e) { 160 LOG.debug("Got exception when remove replication peer", e); 161 } 162 }); 163 removePeers[i].start(); 164 } 165 for (Thread removePeer : removePeers) { 166 removePeer.join(); 167 } 168 assertEquals(1, successCount.get()); 169 170 // Test concurrent add peer operation again 171 successCount.set(0); 172 addPeers = new Thread[threadNum]; 173 for (int i = 0; i < threadNum; i++) { 174 addPeers[i] = new Thread(() -> { 175 try { 176 hbaseAdmin.addReplicationPeer(ID_ONE, 177 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); 178 successCount.incrementAndGet(); 179 } catch (Exception e) { 180 LOG.debug("Got exception when add replication peer", e); 181 } 182 }); 183 addPeers[i].start(); 184 } 185 for (Thread addPeer : addPeers) { 186 addPeer.join(); 187 } 188 assertEquals(1, successCount.get()); 189 } 190 191 @Test 192 public void testAddInvalidPeer() { 193 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 194 builder.setClusterKey(KEY_ONE); 195 try { 196 String invalidPeerId = "1-2"; 197 hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build()); 198 fail("Should fail as the peer id: " + invalidPeerId + " is invalid"); 199 } catch (Exception e) { 200 // OK 201 } 202 203 try { 204 String invalidClusterKey = "2181:/hbase"; 205 builder.setClusterKey(invalidClusterKey); 206 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 207 fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid"); 208 } catch (Exception e) { 209 // OK 210 } 211 } 212 213 /** 214 * Simple testing of adding and removing peers, basically shows that 215 * all interactions with ZK work 216 * @throws Exception 217 */ 218 @Test 219 public void testAddRemovePeer() throws Exception { 220 ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder(); 221 rpc1.setClusterKey(KEY_ONE); 222 ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder(); 223 rpc2.setClusterKey(KEY_SECOND); 224 // Add a valid peer 225 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 226 // try adding the same (fails) 227 try { 228 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 229 } catch (Exception e) { 230 // OK! 231 } 232 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 233 // Try to remove an inexisting peer 234 try { 235 hbaseAdmin.removeReplicationPeer(ID_SECOND); 236 fail(); 237 } catch (Exception e) { 238 // OK! 239 } 240 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 241 // Add a second since multi-slave is supported 242 try { 243 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build()); 244 } catch (Exception e) { 245 fail(); 246 } 247 assertEquals(2, hbaseAdmin.listReplicationPeers().size()); 248 // Remove the first peer we added 249 hbaseAdmin.removeReplicationPeer(ID_ONE); 250 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 251 hbaseAdmin.removeReplicationPeer(ID_SECOND); 252 assertEquals(0, hbaseAdmin.listReplicationPeers().size()); 253 } 254 255 @Test 256 public void testAddPeerWithState() throws Exception { 257 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 258 rpc1.setClusterKey(KEY_ONE); 259 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true); 260 assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled()); 261 hbaseAdmin.removeReplicationPeer(ID_ONE); 262 263 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 264 rpc2.setClusterKey(KEY_SECOND); 265 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false); 266 assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled()); 267 hbaseAdmin.removeReplicationPeer(ID_SECOND); 268 } 269 270 /** 271 * Tests that the peer configuration used by ReplicationAdmin contains all 272 * the peer's properties. 273 */ 274 @Test 275 public void testPeerConfig() throws Exception { 276 ReplicationPeerConfig config = new ReplicationPeerConfig(); 277 config.setClusterKey(KEY_ONE); 278 config.getConfiguration().put("key1", "value1"); 279 config.getConfiguration().put("key2", "value2"); 280 hbaseAdmin.addReplicationPeer(ID_ONE, config); 281 282 List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers(); 283 assertEquals(1, peers.size()); 284 ReplicationPeerDescription peerOne = peers.get(0); 285 assertNotNull(peerOne); 286 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 287 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 288 289 hbaseAdmin.removeReplicationPeer(ID_ONE); 290 } 291 292 @Test 293 public void testAddPeerWithUnDeletedQueues() throws Exception { 294 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 295 rpc1.setClusterKey(KEY_ONE); 296 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 297 rpc2.setClusterKey(KEY_SECOND); 298 Configuration conf = TEST_UTIL.getConfiguration(); 299 ReplicationQueueStorage queueStorage = 300 ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf); 301 302 ServerName serverName = ServerName.valueOf("server1", 8000, 1234); 303 // add queue for ID_ONE 304 queueStorage.addWAL(serverName, ID_ONE, "file1"); 305 try { 306 admin.addPeer(ID_ONE, rpc1, null); 307 fail(); 308 } catch (Exception e) { 309 // OK! 310 } 311 queueStorage.removeQueue(serverName, ID_ONE); 312 assertEquals(0, queueStorage.getAllQueues(serverName).size()); 313 314 // add recovered queue for ID_ONE 315 queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1"); 316 try { 317 admin.addPeer(ID_ONE, rpc2, null); 318 fail(); 319 } catch (Exception e) { 320 // OK! 321 } 322 } 323 324 /** 325 * basic checks that when we add a peer that it is enabled, and that we can disable 326 * @throws Exception 327 */ 328 @Test 329 public void testEnableDisable() throws Exception { 330 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 331 rpc1.setClusterKey(KEY_ONE); 332 admin.addPeer(ID_ONE, rpc1, null); 333 assertEquals(1, admin.getPeersCount()); 334 assertTrue(admin.getPeerState(ID_ONE)); 335 admin.disablePeer(ID_ONE); 336 337 assertFalse(admin.getPeerState(ID_ONE)); 338 try { 339 admin.getPeerState(ID_SECOND); 340 } catch (ReplicationPeerNotFoundException e) { 341 // OK! 342 } 343 admin.removePeer(ID_ONE); 344 } 345 346 @Test 347 public void testAppendPeerTableCFs() throws Exception { 348 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 349 rpc.setClusterKey(KEY_ONE); 350 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 351 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 352 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 353 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 354 final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); 355 final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); 356 357 // Add a valid peer 358 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 359 360 // Update peer config, not replicate all user tables 361 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 362 rpc.setReplicateAllUserTables(false); 363 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 364 365 Map<TableName, List<String>> tableCFs = new HashMap<>(); 366 tableCFs.put(tableName1, null); 367 admin.appendPeerTableCFs(ID_ONE, tableCFs); 368 Map<TableName, List<String>> result = 369 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 370 assertEquals(1, result.size()); 371 assertEquals(true, result.containsKey(tableName1)); 372 assertNull(result.get(tableName1)); 373 374 // append table t2 to replication 375 tableCFs.clear(); 376 tableCFs.put(tableName2, null); 377 admin.appendPeerTableCFs(ID_ONE, tableCFs); 378 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 379 assertEquals(2, result.size()); 380 assertTrue("Should contain t1", result.containsKey(tableName1)); 381 assertTrue("Should contain t2", result.containsKey(tableName2)); 382 assertNull(result.get(tableName1)); 383 assertNull(result.get(tableName2)); 384 385 // append table column family: f1 of t3 to replication 386 tableCFs.clear(); 387 tableCFs.put(tableName3, new ArrayList<>()); 388 tableCFs.get(tableName3).add("f1"); 389 admin.appendPeerTableCFs(ID_ONE, tableCFs); 390 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 391 assertEquals(3, result.size()); 392 assertTrue("Should contain t1", result.containsKey(tableName1)); 393 assertTrue("Should contain t2", result.containsKey(tableName2)); 394 assertTrue("Should contain t3", result.containsKey(tableName3)); 395 assertNull(result.get(tableName1)); 396 assertNull(result.get(tableName2)); 397 assertEquals(1, result.get(tableName3).size()); 398 assertEquals("f1", result.get(tableName3).get(0)); 399 400 tableCFs.clear(); 401 tableCFs.put(tableName4, new ArrayList<>()); 402 tableCFs.get(tableName4).add("f1"); 403 tableCFs.get(tableName4).add("f2"); 404 admin.appendPeerTableCFs(ID_ONE, tableCFs); 405 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 406 assertEquals(4, result.size()); 407 assertTrue("Should contain t1", result.containsKey(tableName1)); 408 assertTrue("Should contain t2", result.containsKey(tableName2)); 409 assertTrue("Should contain t3", result.containsKey(tableName3)); 410 assertTrue("Should contain t4", result.containsKey(tableName4)); 411 assertNull(result.get(tableName1)); 412 assertNull(result.get(tableName2)); 413 assertEquals(1, result.get(tableName3).size()); 414 assertEquals("f1", result.get(tableName3).get(0)); 415 assertEquals(2, result.get(tableName4).size()); 416 assertEquals("f1", result.get(tableName4).get(0)); 417 assertEquals("f2", result.get(tableName4).get(1)); 418 419 // append "table5" => [], then append "table5" => ["f1"] 420 tableCFs.clear(); 421 tableCFs.put(tableName5, new ArrayList<>()); 422 admin.appendPeerTableCFs(ID_ONE, tableCFs); 423 tableCFs.clear(); 424 tableCFs.put(tableName5, new ArrayList<>()); 425 tableCFs.get(tableName5).add("f1"); 426 admin.appendPeerTableCFs(ID_ONE, tableCFs); 427 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 428 assertEquals(5, result.size()); 429 assertTrue("Should contain t5", result.containsKey(tableName5)); 430 // null means replication all cfs of tab5 431 assertNull(result.get(tableName5)); 432 433 // append "table6" => ["f1"], then append "table6" => [] 434 tableCFs.clear(); 435 tableCFs.put(tableName6, new ArrayList<>()); 436 tableCFs.get(tableName6).add("f1"); 437 admin.appendPeerTableCFs(ID_ONE, tableCFs); 438 tableCFs.clear(); 439 tableCFs.put(tableName6, new ArrayList<>()); 440 admin.appendPeerTableCFs(ID_ONE, tableCFs); 441 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 442 assertEquals(6, result.size()); 443 assertTrue("Should contain t6", result.containsKey(tableName6)); 444 // null means replication all cfs of tab6 445 assertNull(result.get(tableName6)); 446 447 admin.removePeer(ID_ONE); 448 } 449 450 @Test 451 public void testRemovePeerTableCFs() throws Exception { 452 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 453 rpc.setClusterKey(KEY_ONE); 454 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 455 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 456 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 457 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 458 459 // Add a valid peer 460 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 461 462 // Update peer config, not replicate all user tables 463 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 464 rpc.setReplicateAllUserTables(false); 465 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 466 467 Map<TableName, List<String>> tableCFs = new HashMap<>(); 468 try { 469 tableCFs.put(tableName3, null); 470 admin.removePeerTableCFs(ID_ONE, tableCFs); 471 assertTrue(false); 472 } catch (ReplicationException e) { 473 } 474 assertNull(admin.getPeerTableCFs(ID_ONE)); 475 476 tableCFs.clear(); 477 tableCFs.put(tableName1, null); 478 tableCFs.put(tableName2, new ArrayList<>()); 479 tableCFs.get(tableName2).add("cf1"); 480 admin.setPeerTableCFs(ID_ONE, tableCFs); 481 try { 482 tableCFs.clear(); 483 tableCFs.put(tableName3, null); 484 admin.removePeerTableCFs(ID_ONE, tableCFs); 485 assertTrue(false); 486 } catch (ReplicationException e) { 487 } 488 Map<TableName, List<String>> result = 489 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 490 assertEquals(2, result.size()); 491 assertTrue("Should contain t1", result.containsKey(tableName1)); 492 assertTrue("Should contain t2", result.containsKey(tableName2)); 493 assertNull(result.get(tableName1)); 494 assertEquals(1, result.get(tableName2).size()); 495 assertEquals("cf1", result.get(tableName2).get(0)); 496 497 try { 498 tableCFs.clear(); 499 tableCFs.put(tableName1, new ArrayList<>()); 500 tableCFs.get(tableName1).add("f1"); 501 admin.removePeerTableCFs(ID_ONE, tableCFs); 502 assertTrue(false); 503 } catch (ReplicationException e) { 504 } 505 tableCFs.clear(); 506 tableCFs.put(tableName1, null); 507 admin.removePeerTableCFs(ID_ONE, tableCFs); 508 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 509 assertEquals(1, result.size()); 510 assertEquals(1, result.get(tableName2).size()); 511 assertEquals("cf1", result.get(tableName2).get(0)); 512 513 try { 514 tableCFs.clear(); 515 tableCFs.put(tableName2, null); 516 admin.removePeerTableCFs(ID_ONE, tableCFs); 517 fail(); 518 } catch (ReplicationException e) { 519 } 520 tableCFs.clear(); 521 tableCFs.put(tableName2, new ArrayList<>()); 522 tableCFs.get(tableName2).add("cf1"); 523 admin.removePeerTableCFs(ID_ONE, tableCFs); 524 assertNull(admin.getPeerTableCFs(ID_ONE)); 525 526 tableCFs.clear(); 527 tableCFs.put(tableName4, new ArrayList<>()); 528 admin.setPeerTableCFs(ID_ONE, tableCFs); 529 admin.removePeerTableCFs(ID_ONE, tableCFs); 530 assertNull(admin.getPeerTableCFs(ID_ONE)); 531 532 admin.removePeer(ID_ONE); 533 } 534 535 @Test 536 public void testSetPeerNamespaces() throws Exception { 537 String ns1 = "ns1"; 538 String ns2 = "ns2"; 539 540 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 541 rpc.setClusterKey(KEY_ONE); 542 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 543 544 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 545 rpc.setReplicateAllUserTables(false); 546 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 547 548 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 549 Set<String> namespaces = new HashSet<>(); 550 namespaces.add(ns1); 551 namespaces.add(ns2); 552 rpc.setNamespaces(namespaces); 553 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 554 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 555 assertEquals(2, namespaces.size()); 556 assertTrue(namespaces.contains(ns1)); 557 assertTrue(namespaces.contains(ns2)); 558 559 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 560 namespaces = new HashSet<>(); 561 namespaces.add(ns1); 562 rpc.setNamespaces(namespaces); 563 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 564 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 565 assertEquals(1, namespaces.size()); 566 assertTrue(namespaces.contains(ns1)); 567 568 hbaseAdmin.removeReplicationPeer(ID_ONE); 569 } 570 571 @Test 572 public void testSetReplicateAllUserTables() throws Exception { 573 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 574 rpc.setClusterKey(KEY_ONE); 575 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 576 577 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 578 assertTrue(rpc.replicateAllUserTables()); 579 580 rpc.setReplicateAllUserTables(false); 581 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 582 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 583 assertFalse(rpc.replicateAllUserTables()); 584 585 rpc.setReplicateAllUserTables(true); 586 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 587 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 588 assertTrue(rpc.replicateAllUserTables()); 589 590 hbaseAdmin.removeReplicationPeer(ID_ONE); 591 } 592 593 @Test 594 public void testPeerExcludeNamespaces() throws Exception { 595 String ns1 = "ns1"; 596 String ns2 = "ns2"; 597 598 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 599 rpc.setClusterKey(KEY_ONE); 600 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 601 602 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 603 assertTrue(rpc.replicateAllUserTables()); 604 605 Set<String> namespaces = new HashSet<String>(); 606 namespaces.add(ns1); 607 namespaces.add(ns2); 608 rpc.setExcludeNamespaces(namespaces); 609 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 610 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 611 assertEquals(2, namespaces.size()); 612 assertTrue(namespaces.contains(ns1)); 613 assertTrue(namespaces.contains(ns2)); 614 615 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 616 namespaces = new HashSet<String>(); 617 namespaces.add(ns1); 618 rpc.setExcludeNamespaces(namespaces); 619 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 620 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 621 assertEquals(1, namespaces.size()); 622 assertTrue(namespaces.contains(ns1)); 623 624 hbaseAdmin.removeReplicationPeer(ID_ONE); 625 } 626 627 @Test 628 public void testPeerExcludeTableCFs() throws Exception { 629 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 630 rpc.setClusterKey(KEY_ONE); 631 TableName tab1 = TableName.valueOf("t1"); 632 TableName tab2 = TableName.valueOf("t2"); 633 TableName tab3 = TableName.valueOf("t3"); 634 TableName tab4 = TableName.valueOf("t4"); 635 636 // Add a valid peer 637 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 638 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 639 assertTrue(rpc.replicateAllUserTables()); 640 641 Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>(); 642 tableCFs.put(tab1, null); 643 rpc.setExcludeTableCFsMap(tableCFs); 644 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 645 Map<TableName, List<String>> result = 646 hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 647 assertEquals(1, result.size()); 648 assertEquals(true, result.containsKey(tab1)); 649 assertNull(result.get(tab1)); 650 651 tableCFs.put(tab2, new ArrayList<String>()); 652 tableCFs.get(tab2).add("f1"); 653 rpc.setExcludeTableCFsMap(tableCFs); 654 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 655 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 656 assertEquals(2, result.size()); 657 assertTrue("Should contain t1", result.containsKey(tab1)); 658 assertTrue("Should contain t2", result.containsKey(tab2)); 659 assertNull(result.get(tab1)); 660 assertEquals(1, result.get(tab2).size()); 661 assertEquals("f1", result.get(tab2).get(0)); 662 663 tableCFs.clear(); 664 tableCFs.put(tab3, new ArrayList<String>()); 665 tableCFs.put(tab4, new ArrayList<String>()); 666 tableCFs.get(tab4).add("f1"); 667 tableCFs.get(tab4).add("f2"); 668 rpc.setExcludeTableCFsMap(tableCFs); 669 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 670 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 671 assertEquals(2, result.size()); 672 assertTrue("Should contain t3", result.containsKey(tab3)); 673 assertTrue("Should contain t4", result.containsKey(tab4)); 674 assertNull(result.get(tab3)); 675 assertEquals(2, result.get(tab4).size()); 676 assertEquals("f1", result.get(tab4).get(0)); 677 assertEquals("f2", result.get(tab4).get(1)); 678 679 hbaseAdmin.removeReplicationPeer(ID_ONE); 680 } 681 682 @Test 683 public void testPeerConfigConflict() throws Exception { 684 // Default replicate_all flag is true 685 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 686 rpc.setClusterKey(KEY_ONE); 687 688 String ns1 = "ns1"; 689 Set<String> namespaces = new HashSet<String>(); 690 namespaces.add(ns1); 691 692 TableName tab1 = TableName.valueOf("ns2:tabl"); 693 Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); 694 tableCfs.put(tab1, new ArrayList<String>()); 695 696 try { 697 rpc.setNamespaces(namespaces); 698 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 699 fail("Should throw Exception." 700 + " When replicate all flag is true, no need to config namespaces"); 701 } catch (IOException e) { 702 // OK 703 rpc.setNamespaces(null); 704 } 705 706 try { 707 rpc.setTableCFsMap(tableCfs); 708 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 709 fail("Should throw Exception." 710 + " When replicate all flag is true, no need to config table-cfs"); 711 } catch (IOException e) { 712 // OK 713 rpc.setTableCFsMap(null); 714 } 715 716 // Set replicate_all flag to true 717 rpc.setReplicateAllUserTables(false); 718 try { 719 rpc.setExcludeNamespaces(namespaces); 720 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 721 fail("Should throw Exception." 722 + " When replicate all flag is false, no need to config exclude namespaces"); 723 } catch (IOException e) { 724 // OK 725 rpc.setExcludeNamespaces(null); 726 } 727 728 try { 729 rpc.setExcludeTableCFsMap(tableCfs); 730 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 731 fail("Should throw Exception." 732 + " When replicate all flag is false, no need to config exclude table-cfs"); 733 } catch (IOException e) { 734 // OK 735 rpc.setExcludeTableCFsMap(null); 736 } 737 738 rpc.setNamespaces(namespaces); 739 rpc.setTableCFsMap(tableCfs); 740 // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config 741 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 742 743 // Default replicate_all flag is true 744 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 745 rpc2.setClusterKey(KEY_SECOND); 746 rpc2.setExcludeNamespaces(namespaces); 747 rpc2.setExcludeTableCFsMap(tableCfs); 748 // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude 749 // table-cfs config 750 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 751 752 hbaseAdmin.removeReplicationPeer(ID_ONE); 753 hbaseAdmin.removeReplicationPeer(ID_SECOND); 754 } 755 756 @Test 757 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 758 String ns1 = "ns1"; 759 String ns2 = "ns2"; 760 final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); 761 final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); 762 763 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 764 rpc.setClusterKey(KEY_ONE); 765 rpc.setReplicateAllUserTables(false); 766 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 767 768 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 769 Set<String> namespaces = new HashSet<String>(); 770 namespaces.add(ns1); 771 rpc.setNamespaces(namespaces); 772 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 773 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 774 try { 775 Map<TableName, List<String>> tableCfs = new HashMap<>(); 776 tableCfs.put(tableName1, new ArrayList<>()); 777 rpc.setTableCFsMap(tableCfs); 778 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 779 fail("Should throw ReplicationException" + " Because table " + tableName1 780 + " conflict with namespace " + ns1); 781 } catch (Exception e) { 782 // OK 783 } 784 785 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 786 Map<TableName, List<String>> tableCfs = new HashMap<>(); 787 tableCfs.put(tableName2, new ArrayList<>()); 788 rpc.setTableCFsMap(tableCfs); 789 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 790 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 791 try { 792 namespaces.clear(); 793 namespaces.add(ns2); 794 rpc.setNamespaces(namespaces); 795 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 796 fail("Should throw ReplicationException" + " Because namespace " + ns2 797 + " conflict with table " + tableName2); 798 } catch (Exception e) { 799 // OK 800 } 801 802 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 803 rpc2.setClusterKey(KEY_SECOND); 804 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 805 806 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 807 Set<String> excludeNamespaces = new HashSet<String>(); 808 excludeNamespaces.add(ns1); 809 rpc2.setExcludeNamespaces(excludeNamespaces); 810 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 811 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 812 try { 813 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 814 excludeTableCfs.put(tableName1, new ArrayList<>()); 815 rpc2.setExcludeTableCFsMap(excludeTableCfs); 816 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 817 fail("Should throw ReplicationException" + " Because exclude table " + tableName1 818 + " conflict with exclude namespace " + ns1); 819 } catch (Exception e) { 820 // OK 821 } 822 823 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 824 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 825 excludeTableCfs.put(tableName2, new ArrayList<>()); 826 rpc2.setExcludeTableCFsMap(excludeTableCfs); 827 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 828 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 829 try { 830 namespaces.clear(); 831 namespaces.add(ns2); 832 rpc2.setNamespaces(namespaces); 833 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 834 fail("Should throw ReplicationException" + " Because exclude namespace " + ns2 835 + " conflict with exclude table " + tableName2); 836 } catch (Exception e) { 837 // OK 838 } 839 840 hbaseAdmin.removeReplicationPeer(ID_ONE); 841 hbaseAdmin.removeReplicationPeer(ID_SECOND); 842 } 843 844 @Test 845 public void testPeerBandwidth() throws Exception { 846 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 847 rpc.setClusterKey(KEY_ONE); 848 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 849 850 rpc = admin.getPeerConfig(ID_ONE); 851 assertEquals(0, rpc.getBandwidth()); 852 853 rpc.setBandwidth(2097152); 854 admin.updatePeerConfig(ID_ONE, rpc); 855 856 assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); 857 admin.removePeer(ID_ONE); 858 } 859 860 @Test 861 public void testPeerClusterKey() throws Exception { 862 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 863 builder.setClusterKey(KEY_ONE); 864 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 865 866 try { 867 builder.setClusterKey(KEY_SECOND); 868 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 869 fail("Change cluster key on an existing peer is not allowed"); 870 } catch (Exception e) { 871 // OK 872 } 873 } 874 875 @Test 876 public void testPeerReplicationEndpointImpl() throws Exception { 877 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 878 builder.setClusterKey(KEY_ONE); 879 builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); 880 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 881 882 try { 883 builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()); 884 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 885 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 886 } catch (Exception e) { 887 // OK 888 } 889 890 try { 891 builder = ReplicationPeerConfig.newBuilder(); 892 builder.setClusterKey(KEY_ONE); 893 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 894 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 895 } catch (Exception e) { 896 // OK 897 } 898 899 builder = ReplicationPeerConfig.newBuilder(); 900 builder.setClusterKey(KEY_SECOND); 901 hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); 902 903 try { 904 builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); 905 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); 906 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 907 } catch (Exception e) { 908 // OK 909 } 910 } 911}