001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.concurrent.atomic.AtomicLong; 035import java.util.regex.Pattern; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; 045import org.apache.hadoop.hbase.replication.ReplicationException; 046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 047import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 051import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; 052import org.apache.hadoop.hbase.testclassification.ClientTests; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.junit.After; 055import org.junit.AfterClass; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * Unit testing of ReplicationAdmin 067 */ 068@Category({MediumTests.class, ClientTests.class}) 069public class TestReplicationAdmin { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestReplicationAdmin.class); 074 075 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); 076 077 private final static HBaseTestingUtility TEST_UTIL = 078 new HBaseTestingUtility(); 079 080 private final String ID_ONE = "1"; 081 private final String KEY_ONE = "127.0.0.1:2181:/hbase"; 082 private final String ID_SECOND = "2"; 083 private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; 084 085 private static ReplicationAdmin admin; 086 private static Admin hbaseAdmin; 087 088 @Rule 089 public TestName name = new TestName(); 090 091 /** 092 * @throws java.lang.Exception 093 */ 094 @BeforeClass 095 public static void setUpBeforeClass() throws Exception { 096 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 097 TEST_UTIL.startMiniCluster(); 098 admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); 099 hbaseAdmin = TEST_UTIL.getAdmin(); 100 } 101 102 @AfterClass 103 public static void tearDownAfterClass() throws Exception { 104 if (admin != null) { 105 admin.close(); 106 } 107 TEST_UTIL.shutdownMiniCluster(); 108 } 109 110 @After 111 public void tearDown() throws Exception { 112 for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) { 113 hbaseAdmin.removeReplicationPeer(desc.getPeerId()); 114 } 115 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 116 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 117 for (ServerName serverName : queueStorage.getListOfReplicators()) { 118 for (String queue : queueStorage.getAllQueues(serverName)) { 119 queueStorage.removeQueue(serverName, queue); 120 } 121 queueStorage.removeReplicatorIfQueueIsEmpty(serverName); 122 } 123 } 124 125 @Test 126 public void testConcurrentPeerOperations() throws Exception { 127 int threadNum = 5; 128 AtomicLong successCount = new AtomicLong(0); 129 130 // Test concurrent add peer operation 131 Thread[] addPeers = new Thread[threadNum]; 132 for (int i = 0; i < threadNum; i++) { 133 addPeers[i] = new Thread(() -> { 134 try { 135 hbaseAdmin.addReplicationPeer(ID_ONE, 136 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); 137 successCount.incrementAndGet(); 138 } catch (Exception e) { 139 LOG.debug("Got exception when add replication peer", e); 140 } 141 }); 142 addPeers[i].start(); 143 } 144 for (Thread addPeer : addPeers) { 145 addPeer.join(); 146 } 147 assertEquals(1, successCount.get()); 148 149 // Test concurrent remove peer operation 150 successCount.set(0); 151 Thread[] removePeers = new Thread[threadNum]; 152 for (int i = 0; i < threadNum; i++) { 153 removePeers[i] = new Thread(() -> { 154 try { 155 hbaseAdmin.removeReplicationPeer(ID_ONE); 156 successCount.incrementAndGet(); 157 } catch (Exception e) { 158 LOG.debug("Got exception when remove replication peer", e); 159 } 160 }); 161 removePeers[i].start(); 162 } 163 for (Thread removePeer : removePeers) { 164 removePeer.join(); 165 } 166 assertEquals(1, successCount.get()); 167 168 // Test concurrent add peer operation again 169 successCount.set(0); 170 addPeers = new Thread[threadNum]; 171 for (int i = 0; i < threadNum; i++) { 172 addPeers[i] = new Thread(() -> { 173 try { 174 hbaseAdmin.addReplicationPeer(ID_ONE, 175 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()); 176 successCount.incrementAndGet(); 177 } catch (Exception e) { 178 LOG.debug("Got exception when add replication peer", e); 179 } 180 }); 181 addPeers[i].start(); 182 } 183 for (Thread addPeer : addPeers) { 184 addPeer.join(); 185 } 186 assertEquals(1, successCount.get()); 187 } 188 189 @Test 190 public void testAddInvalidPeer() { 191 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 192 builder.setClusterKey(KEY_ONE); 193 try { 194 String invalidPeerId = "1-2"; 195 hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build()); 196 fail("Should fail as the peer id: " + invalidPeerId + " is invalid"); 197 } catch (Exception e) { 198 // OK 199 } 200 201 try { 202 String invalidClusterKey = "2181:/hbase"; 203 builder.setClusterKey(invalidClusterKey); 204 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 205 fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid"); 206 } catch (Exception e) { 207 // OK 208 } 209 } 210 211 /** 212 * Simple testing of adding and removing peers, basically shows that 213 * all interactions with ZK work 214 * @throws Exception 215 */ 216 @Test 217 public void testAddRemovePeer() throws Exception { 218 ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder(); 219 rpc1.setClusterKey(KEY_ONE); 220 ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder(); 221 rpc2.setClusterKey(KEY_SECOND); 222 // Add a valid peer 223 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 224 // try adding the same (fails) 225 try { 226 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 227 } catch (Exception e) { 228 // OK! 229 } 230 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 231 // Try to remove an inexisting peer 232 try { 233 hbaseAdmin.removeReplicationPeer(ID_SECOND); 234 fail(); 235 } catch (Exception e) { 236 // OK! 237 } 238 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 239 // Add a second since multi-slave is supported 240 try { 241 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build()); 242 } catch (Exception e) { 243 fail(); 244 } 245 assertEquals(2, hbaseAdmin.listReplicationPeers().size()); 246 // Remove the first peer we added 247 hbaseAdmin.removeReplicationPeer(ID_ONE); 248 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 249 hbaseAdmin.removeReplicationPeer(ID_SECOND); 250 assertEquals(0, hbaseAdmin.listReplicationPeers().size()); 251 } 252 253 @Test 254 public void testAddPeerWithState() throws Exception { 255 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 256 rpc1.setClusterKey(KEY_ONE); 257 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true); 258 assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled()); 259 hbaseAdmin.removeReplicationPeer(ID_ONE); 260 261 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 262 rpc2.setClusterKey(KEY_SECOND); 263 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false); 264 assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled()); 265 hbaseAdmin.removeReplicationPeer(ID_SECOND); 266 } 267 268 /** 269 * Tests that the peer configuration used by ReplicationAdmin contains all 270 * the peer's properties. 271 */ 272 @Test 273 public void testPeerConfig() throws Exception { 274 ReplicationPeerConfig config = new ReplicationPeerConfig(); 275 config.setClusterKey(KEY_ONE); 276 config.getConfiguration().put("key1", "value1"); 277 config.getConfiguration().put("key2", "value2"); 278 hbaseAdmin.addReplicationPeer(ID_ONE, config); 279 280 List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers(); 281 assertEquals(1, peers.size()); 282 ReplicationPeerDescription peerOne = peers.get(0); 283 assertNotNull(peerOne); 284 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 285 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 286 287 hbaseAdmin.removeReplicationPeer(ID_ONE); 288 } 289 290 @Test 291 public void testAddPeerWithUnDeletedQueues() throws Exception { 292 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 293 rpc1.setClusterKey(KEY_ONE); 294 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 295 rpc2.setClusterKey(KEY_SECOND); 296 Configuration conf = TEST_UTIL.getConfiguration(); 297 ReplicationQueueStorage queueStorage = 298 ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf); 299 300 ServerName serverName = ServerName.valueOf("server1", 8000, 1234); 301 // add queue for ID_ONE 302 queueStorage.addWAL(serverName, ID_ONE, "file1"); 303 try { 304 admin.addPeer(ID_ONE, rpc1, null); 305 fail(); 306 } catch (Exception e) { 307 // OK! 308 } 309 queueStorage.removeQueue(serverName, ID_ONE); 310 assertEquals(0, queueStorage.getAllQueues(serverName).size()); 311 312 // add recovered queue for ID_ONE 313 queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1"); 314 try { 315 admin.addPeer(ID_ONE, rpc2, null); 316 fail(); 317 } catch (Exception e) { 318 // OK! 319 } 320 } 321 322 /** 323 * basic checks that when we add a peer that it is enabled, and that we can disable 324 * @throws Exception 325 */ 326 @Test 327 public void testEnableDisable() throws Exception { 328 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 329 rpc1.setClusterKey(KEY_ONE); 330 admin.addPeer(ID_ONE, rpc1, null); 331 assertEquals(1, admin.getPeersCount()); 332 assertTrue(admin.getPeerState(ID_ONE)); 333 admin.disablePeer(ID_ONE); 334 335 assertFalse(admin.getPeerState(ID_ONE)); 336 try { 337 admin.getPeerState(ID_SECOND); 338 } catch (ReplicationPeerNotFoundException e) { 339 // OK! 340 } 341 admin.removePeer(ID_ONE); 342 } 343 344 @Test 345 public void testAppendPeerTableCFs() throws Exception { 346 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 347 rpc.setClusterKey(KEY_ONE); 348 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 349 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 350 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 351 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 352 final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); 353 final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); 354 355 // Add a valid peer 356 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 357 358 // Update peer config, not replicate all user tables 359 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 360 rpc.setReplicateAllUserTables(false); 361 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 362 363 Map<TableName, List<String>> tableCFs = new HashMap<>(); 364 tableCFs.put(tableName1, null); 365 admin.appendPeerTableCFs(ID_ONE, tableCFs); 366 Map<TableName, List<String>> result = 367 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 368 assertEquals(1, result.size()); 369 assertEquals(true, result.containsKey(tableName1)); 370 assertNull(result.get(tableName1)); 371 372 // append table t2 to replication 373 tableCFs.clear(); 374 tableCFs.put(tableName2, null); 375 admin.appendPeerTableCFs(ID_ONE, tableCFs); 376 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 377 assertEquals(2, result.size()); 378 assertTrue("Should contain t1", result.containsKey(tableName1)); 379 assertTrue("Should contain t2", result.containsKey(tableName2)); 380 assertNull(result.get(tableName1)); 381 assertNull(result.get(tableName2)); 382 383 // append table column family: f1 of t3 to replication 384 tableCFs.clear(); 385 tableCFs.put(tableName3, new ArrayList<>()); 386 tableCFs.get(tableName3).add("f1"); 387 admin.appendPeerTableCFs(ID_ONE, tableCFs); 388 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 389 assertEquals(3, result.size()); 390 assertTrue("Should contain t1", result.containsKey(tableName1)); 391 assertTrue("Should contain t2", result.containsKey(tableName2)); 392 assertTrue("Should contain t3", result.containsKey(tableName3)); 393 assertNull(result.get(tableName1)); 394 assertNull(result.get(tableName2)); 395 assertEquals(1, result.get(tableName3).size()); 396 assertEquals("f1", result.get(tableName3).get(0)); 397 398 tableCFs.clear(); 399 tableCFs.put(tableName4, new ArrayList<>()); 400 tableCFs.get(tableName4).add("f1"); 401 tableCFs.get(tableName4).add("f2"); 402 admin.appendPeerTableCFs(ID_ONE, tableCFs); 403 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 404 assertEquals(4, result.size()); 405 assertTrue("Should contain t1", result.containsKey(tableName1)); 406 assertTrue("Should contain t2", result.containsKey(tableName2)); 407 assertTrue("Should contain t3", result.containsKey(tableName3)); 408 assertTrue("Should contain t4", result.containsKey(tableName4)); 409 assertNull(result.get(tableName1)); 410 assertNull(result.get(tableName2)); 411 assertEquals(1, result.get(tableName3).size()); 412 assertEquals("f1", result.get(tableName3).get(0)); 413 assertEquals(2, result.get(tableName4).size()); 414 assertEquals("f1", result.get(tableName4).get(0)); 415 assertEquals("f2", result.get(tableName4).get(1)); 416 417 // append "table5" => [], then append "table5" => ["f1"] 418 tableCFs.clear(); 419 tableCFs.put(tableName5, new ArrayList<>()); 420 admin.appendPeerTableCFs(ID_ONE, tableCFs); 421 tableCFs.clear(); 422 tableCFs.put(tableName5, new ArrayList<>()); 423 tableCFs.get(tableName5).add("f1"); 424 admin.appendPeerTableCFs(ID_ONE, tableCFs); 425 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 426 assertEquals(5, result.size()); 427 assertTrue("Should contain t5", result.containsKey(tableName5)); 428 // null means replication all cfs of tab5 429 assertNull(result.get(tableName5)); 430 431 // append "table6" => ["f1"], then append "table6" => [] 432 tableCFs.clear(); 433 tableCFs.put(tableName6, new ArrayList<>()); 434 tableCFs.get(tableName6).add("f1"); 435 admin.appendPeerTableCFs(ID_ONE, tableCFs); 436 tableCFs.clear(); 437 tableCFs.put(tableName6, new ArrayList<>()); 438 admin.appendPeerTableCFs(ID_ONE, tableCFs); 439 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 440 assertEquals(6, result.size()); 441 assertTrue("Should contain t6", result.containsKey(tableName6)); 442 // null means replication all cfs of tab6 443 assertNull(result.get(tableName6)); 444 445 admin.removePeer(ID_ONE); 446 } 447 448 @Test 449 public void testRemovePeerTableCFs() throws Exception { 450 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 451 rpc.setClusterKey(KEY_ONE); 452 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 453 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 454 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 455 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 456 457 // Add a valid peer 458 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 459 460 // Update peer config, not replicate all user tables 461 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 462 rpc.setReplicateAllUserTables(false); 463 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 464 465 Map<TableName, List<String>> tableCFs = new HashMap<>(); 466 try { 467 tableCFs.put(tableName3, null); 468 admin.removePeerTableCFs(ID_ONE, tableCFs); 469 assertTrue(false); 470 } catch (ReplicationException e) { 471 } 472 assertNull(admin.getPeerTableCFs(ID_ONE)); 473 474 tableCFs.clear(); 475 tableCFs.put(tableName1, null); 476 tableCFs.put(tableName2, new ArrayList<>()); 477 tableCFs.get(tableName2).add("cf1"); 478 admin.setPeerTableCFs(ID_ONE, tableCFs); 479 try { 480 tableCFs.clear(); 481 tableCFs.put(tableName3, null); 482 admin.removePeerTableCFs(ID_ONE, tableCFs); 483 assertTrue(false); 484 } catch (ReplicationException e) { 485 } 486 Map<TableName, List<String>> result = 487 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 488 assertEquals(2, result.size()); 489 assertTrue("Should contain t1", result.containsKey(tableName1)); 490 assertTrue("Should contain t2", result.containsKey(tableName2)); 491 assertNull(result.get(tableName1)); 492 assertEquals(1, result.get(tableName2).size()); 493 assertEquals("cf1", result.get(tableName2).get(0)); 494 495 try { 496 tableCFs.clear(); 497 tableCFs.put(tableName1, new ArrayList<>()); 498 tableCFs.get(tableName1).add("f1"); 499 admin.removePeerTableCFs(ID_ONE, tableCFs); 500 assertTrue(false); 501 } catch (ReplicationException e) { 502 } 503 tableCFs.clear(); 504 tableCFs.put(tableName1, null); 505 admin.removePeerTableCFs(ID_ONE, tableCFs); 506 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 507 assertEquals(1, result.size()); 508 assertEquals(1, result.get(tableName2).size()); 509 assertEquals("cf1", result.get(tableName2).get(0)); 510 511 try { 512 tableCFs.clear(); 513 tableCFs.put(tableName2, null); 514 admin.removePeerTableCFs(ID_ONE, tableCFs); 515 fail(); 516 } catch (ReplicationException e) { 517 } 518 tableCFs.clear(); 519 tableCFs.put(tableName2, new ArrayList<>()); 520 tableCFs.get(tableName2).add("cf1"); 521 admin.removePeerTableCFs(ID_ONE, tableCFs); 522 assertNull(admin.getPeerTableCFs(ID_ONE)); 523 524 tableCFs.clear(); 525 tableCFs.put(tableName4, new ArrayList<>()); 526 admin.setPeerTableCFs(ID_ONE, tableCFs); 527 admin.removePeerTableCFs(ID_ONE, tableCFs); 528 assertNull(admin.getPeerTableCFs(ID_ONE)); 529 530 admin.removePeer(ID_ONE); 531 } 532 533 @Test 534 public void testSetPeerNamespaces() throws Exception { 535 String ns1 = "ns1"; 536 String ns2 = "ns2"; 537 538 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 539 rpc.setClusterKey(KEY_ONE); 540 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 541 542 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 543 rpc.setReplicateAllUserTables(false); 544 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 545 546 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 547 Set<String> namespaces = new HashSet<>(); 548 namespaces.add(ns1); 549 namespaces.add(ns2); 550 rpc.setNamespaces(namespaces); 551 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 552 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 553 assertEquals(2, namespaces.size()); 554 assertTrue(namespaces.contains(ns1)); 555 assertTrue(namespaces.contains(ns2)); 556 557 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 558 namespaces = new HashSet<>(); 559 namespaces.add(ns1); 560 rpc.setNamespaces(namespaces); 561 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 562 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 563 assertEquals(1, namespaces.size()); 564 assertTrue(namespaces.contains(ns1)); 565 566 hbaseAdmin.removeReplicationPeer(ID_ONE); 567 } 568 569 @Test 570 public void testSetReplicateAllUserTables() throws Exception { 571 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 572 rpc.setClusterKey(KEY_ONE); 573 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 574 575 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 576 assertTrue(rpc.replicateAllUserTables()); 577 578 rpc.setReplicateAllUserTables(false); 579 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 580 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 581 assertFalse(rpc.replicateAllUserTables()); 582 583 rpc.setReplicateAllUserTables(true); 584 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 585 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 586 assertTrue(rpc.replicateAllUserTables()); 587 588 hbaseAdmin.removeReplicationPeer(ID_ONE); 589 } 590 591 @Test 592 public void testPeerExcludeNamespaces() throws Exception { 593 String ns1 = "ns1"; 594 String ns2 = "ns2"; 595 596 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 597 rpc.setClusterKey(KEY_ONE); 598 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 599 600 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 601 assertTrue(rpc.replicateAllUserTables()); 602 603 Set<String> namespaces = new HashSet<String>(); 604 namespaces.add(ns1); 605 namespaces.add(ns2); 606 rpc.setExcludeNamespaces(namespaces); 607 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 608 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 609 assertEquals(2, namespaces.size()); 610 assertTrue(namespaces.contains(ns1)); 611 assertTrue(namespaces.contains(ns2)); 612 613 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 614 namespaces = new HashSet<String>(); 615 namespaces.add(ns1); 616 rpc.setExcludeNamespaces(namespaces); 617 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 618 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 619 assertEquals(1, namespaces.size()); 620 assertTrue(namespaces.contains(ns1)); 621 622 hbaseAdmin.removeReplicationPeer(ID_ONE); 623 } 624 625 @Test 626 public void testPeerExcludeTableCFs() throws Exception { 627 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 628 rpc.setClusterKey(KEY_ONE); 629 TableName tab1 = TableName.valueOf("t1"); 630 TableName tab2 = TableName.valueOf("t2"); 631 TableName tab3 = TableName.valueOf("t3"); 632 TableName tab4 = TableName.valueOf("t4"); 633 634 // Add a valid peer 635 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 636 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 637 assertTrue(rpc.replicateAllUserTables()); 638 639 Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>(); 640 tableCFs.put(tab1, null); 641 rpc.setExcludeTableCFsMap(tableCFs); 642 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 643 Map<TableName, List<String>> result = 644 hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 645 assertEquals(1, result.size()); 646 assertEquals(true, result.containsKey(tab1)); 647 assertNull(result.get(tab1)); 648 649 tableCFs.put(tab2, new ArrayList<String>()); 650 tableCFs.get(tab2).add("f1"); 651 rpc.setExcludeTableCFsMap(tableCFs); 652 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 653 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 654 assertEquals(2, result.size()); 655 assertTrue("Should contain t1", result.containsKey(tab1)); 656 assertTrue("Should contain t2", result.containsKey(tab2)); 657 assertNull(result.get(tab1)); 658 assertEquals(1, result.get(tab2).size()); 659 assertEquals("f1", result.get(tab2).get(0)); 660 661 tableCFs.clear(); 662 tableCFs.put(tab3, new ArrayList<String>()); 663 tableCFs.put(tab4, new ArrayList<String>()); 664 tableCFs.get(tab4).add("f1"); 665 tableCFs.get(tab4).add("f2"); 666 rpc.setExcludeTableCFsMap(tableCFs); 667 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 668 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 669 assertEquals(2, result.size()); 670 assertTrue("Should contain t3", result.containsKey(tab3)); 671 assertTrue("Should contain t4", result.containsKey(tab4)); 672 assertNull(result.get(tab3)); 673 assertEquals(2, result.get(tab4).size()); 674 assertEquals("f1", result.get(tab4).get(0)); 675 assertEquals("f2", result.get(tab4).get(1)); 676 677 hbaseAdmin.removeReplicationPeer(ID_ONE); 678 } 679 680 @Test 681 public void testPeerConfigConflict() throws Exception { 682 // Default replicate_all flag is true 683 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 684 rpc.setClusterKey(KEY_ONE); 685 686 String ns1 = "ns1"; 687 Set<String> namespaces = new HashSet<String>(); 688 namespaces.add(ns1); 689 690 TableName tab1 = TableName.valueOf("ns2:tabl"); 691 Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); 692 tableCfs.put(tab1, new ArrayList<String>()); 693 694 try { 695 rpc.setNamespaces(namespaces); 696 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 697 fail("Should throw Exception." 698 + " When replicate all flag is true, no need to config namespaces"); 699 } catch (IOException e) { 700 // OK 701 rpc.setNamespaces(null); 702 } 703 704 try { 705 rpc.setTableCFsMap(tableCfs); 706 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 707 fail("Should throw Exception." 708 + " When replicate all flag is true, no need to config table-cfs"); 709 } catch (IOException e) { 710 // OK 711 rpc.setTableCFsMap(null); 712 } 713 714 // Set replicate_all flag to true 715 rpc.setReplicateAllUserTables(false); 716 try { 717 rpc.setExcludeNamespaces(namespaces); 718 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 719 fail("Should throw Exception." 720 + " When replicate all flag is false, no need to config exclude namespaces"); 721 } catch (IOException e) { 722 // OK 723 rpc.setExcludeNamespaces(null); 724 } 725 726 try { 727 rpc.setExcludeTableCFsMap(tableCfs); 728 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 729 fail("Should throw Exception." 730 + " When replicate all flag is false, no need to config exclude table-cfs"); 731 } catch (IOException e) { 732 // OK 733 rpc.setExcludeTableCFsMap(null); 734 } 735 736 rpc.setNamespaces(namespaces); 737 rpc.setTableCFsMap(tableCfs); 738 // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config 739 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 740 741 // Default replicate_all flag is true 742 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 743 rpc2.setClusterKey(KEY_SECOND); 744 rpc2.setExcludeNamespaces(namespaces); 745 rpc2.setExcludeTableCFsMap(tableCfs); 746 // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude 747 // table-cfs config 748 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 749 750 hbaseAdmin.removeReplicationPeer(ID_ONE); 751 hbaseAdmin.removeReplicationPeer(ID_SECOND); 752 } 753 754 @Test 755 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 756 String ns1 = "ns1"; 757 String ns2 = "ns2"; 758 final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); 759 final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); 760 761 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 762 rpc.setClusterKey(KEY_ONE); 763 rpc.setReplicateAllUserTables(false); 764 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 765 766 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 767 Set<String> namespaces = new HashSet<String>(); 768 namespaces.add(ns1); 769 rpc.setNamespaces(namespaces); 770 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 771 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 772 try { 773 Map<TableName, List<String>> tableCfs = new HashMap<>(); 774 tableCfs.put(tableName1, new ArrayList<>()); 775 rpc.setTableCFsMap(tableCfs); 776 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 777 fail("Should throw ReplicationException" + " Because table " + tableName1 778 + " conflict with namespace " + ns1); 779 } catch (Exception e) { 780 // OK 781 } 782 783 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 784 Map<TableName, List<String>> tableCfs = new HashMap<>(); 785 tableCfs.put(tableName2, new ArrayList<>()); 786 rpc.setTableCFsMap(tableCfs); 787 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 788 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 789 try { 790 namespaces.clear(); 791 namespaces.add(ns2); 792 rpc.setNamespaces(namespaces); 793 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 794 fail("Should throw ReplicationException" + " Because namespace " + ns2 795 + " conflict with table " + tableName2); 796 } catch (Exception e) { 797 // OK 798 } 799 800 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 801 rpc2.setClusterKey(KEY_SECOND); 802 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 803 804 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 805 Set<String> excludeNamespaces = new HashSet<String>(); 806 excludeNamespaces.add(ns1); 807 rpc2.setExcludeNamespaces(excludeNamespaces); 808 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 809 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 810 try { 811 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 812 excludeTableCfs.put(tableName1, new ArrayList<>()); 813 rpc2.setExcludeTableCFsMap(excludeTableCfs); 814 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 815 fail("Should throw ReplicationException" + " Because exclude table " + tableName1 816 + " conflict with exclude namespace " + ns1); 817 } catch (Exception e) { 818 // OK 819 } 820 821 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 822 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 823 excludeTableCfs.put(tableName2, new ArrayList<>()); 824 rpc2.setExcludeTableCFsMap(excludeTableCfs); 825 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 826 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 827 try { 828 namespaces.clear(); 829 namespaces.add(ns2); 830 rpc2.setNamespaces(namespaces); 831 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 832 fail("Should throw ReplicationException" + " Because exclude namespace " + ns2 833 + " conflict with exclude table " + tableName2); 834 } catch (Exception e) { 835 // OK 836 } 837 838 hbaseAdmin.removeReplicationPeer(ID_ONE); 839 hbaseAdmin.removeReplicationPeer(ID_SECOND); 840 } 841 842 @Test 843 public void testPeerBandwidth() throws Exception { 844 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 845 rpc.setClusterKey(KEY_ONE); 846 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 847 848 rpc = admin.getPeerConfig(ID_ONE); 849 assertEquals(0, rpc.getBandwidth()); 850 851 rpc.setBandwidth(2097152); 852 admin.updatePeerConfig(ID_ONE, rpc); 853 854 assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); 855 admin.removePeer(ID_ONE); 856 } 857 858 @Test 859 public void testPeerClusterKey() throws Exception { 860 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 861 builder.setClusterKey(KEY_ONE); 862 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 863 864 try { 865 builder.setClusterKey(KEY_SECOND); 866 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 867 fail("Change cluster key on an existing peer is not allowed"); 868 } catch (Exception e) { 869 // OK 870 } 871 } 872 873 @Test 874 public void testPeerReplicationEndpointImpl() throws Exception { 875 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 876 builder.setClusterKey(KEY_ONE); 877 builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); 878 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 879 880 try { 881 builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()); 882 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 883 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 884 } catch (Exception e) { 885 // OK 886 } 887 888 try { 889 builder = ReplicationPeerConfig.newBuilder(); 890 builder.setClusterKey(KEY_ONE); 891 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 892 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 893 } catch (Exception e) { 894 // OK 895 } 896 897 builder = ReplicationPeerConfig.newBuilder(); 898 builder.setClusterKey(KEY_SECOND); 899 hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); 900 901 try { 902 builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()); 903 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); 904 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 905 } catch (Exception e) { 906 // OK 907 } 908 } 909}