001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.regex.Pattern; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.replication.ReplicationException; 037import org.apache.hadoop.hbase.replication.ReplicationFactory; 038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 041import org.apache.hadoop.hbase.replication.ReplicationQueues; 042import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; 043import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; 044import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; 045import org.apache.hadoop.hbase.testclassification.ClientTests; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 048import org.junit.After; 049import org.junit.AfterClass; 050import org.junit.BeforeClass; 051import org.junit.ClassRule; 052import org.junit.Rule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055import org.junit.rules.TestName; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import static org.junit.Assert.assertEquals; 060import static org.junit.Assert.assertFalse; 061import static org.junit.Assert.assertNotNull; 062import static org.junit.Assert.assertNull; 063import static org.junit.Assert.assertTrue; 064import static org.junit.Assert.fail; 065 066/** 067 * Unit testing of ReplicationAdmin 068 */ 069@Category({MediumTests.class, ClientTests.class}) 070public class TestReplicationAdmin { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestReplicationAdmin.class); 075 076 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); 077 078 private final static HBaseTestingUtility TEST_UTIL = 079 new HBaseTestingUtility(); 080 081 private final String ID_ONE = "1"; 082 private final String KEY_ONE = "127.0.0.1:2181:/hbase"; 083 private final String ID_SECOND = "2"; 084 private final String KEY_SECOND = "127.0.0.1:2181:/hbase2"; 085 086 private static ReplicationAdmin admin; 087 private static Admin hbaseAdmin; 088 089 @Rule 090 public TestName name = new TestName(); 091 092 /** 093 * @throws java.lang.Exception 094 */ 095 @BeforeClass 096 public static void setUpBeforeClass() throws Exception { 097 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 098 TEST_UTIL.startMiniCluster(); 099 admin = new ReplicationAdmin(TEST_UTIL.getConfiguration()); 100 hbaseAdmin = TEST_UTIL.getAdmin(); 101 } 102 103 @AfterClass 104 public static void tearDownAfterClass() throws Exception { 105 if (admin != null) { 106 admin.close(); 107 } 108 TEST_UTIL.shutdownMiniCluster(); 109 } 110 111 @After 112 public void cleanupPeer() { 113 try { 114 hbaseAdmin.removeReplicationPeer(ID_ONE); 115 } catch (Exception e) { 116 LOG.debug("Replication peer " + ID_ONE + " may already be removed"); 117 } 118 try { 119 hbaseAdmin.removeReplicationPeer(ID_SECOND); 120 } catch (Exception e) { 121 LOG.debug("Replication peer " + ID_SECOND + " may already be removed"); 122 } 123 } 124 125 /** 126 * Simple testing of adding and removing peers, basically shows that 127 * all interactions with ZK work 128 * @throws Exception 129 */ 130 @Test 131 public void testAddRemovePeer() throws Exception { 132 ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder(); 133 rpc1.setClusterKey(KEY_ONE); 134 ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder(); 135 rpc2.setClusterKey(KEY_SECOND); 136 // Add a valid peer 137 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 138 // try adding the same (fails) 139 try { 140 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build()); 141 } catch (Exception e) { 142 // OK! 143 } 144 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 145 // Try to remove an inexisting peer 146 try { 147 hbaseAdmin.removeReplicationPeer(ID_SECOND); 148 fail(); 149 } catch (Exception e) { 150 // OK! 151 } 152 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 153 // Add a second since multi-slave is supported 154 try { 155 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build()); 156 } catch (Exception e) { 157 fail(); 158 } 159 assertEquals(2, hbaseAdmin.listReplicationPeers().size()); 160 // Remove the first peer we added 161 hbaseAdmin.removeReplicationPeer(ID_ONE); 162 assertEquals(1, hbaseAdmin.listReplicationPeers().size()); 163 hbaseAdmin.removeReplicationPeer(ID_SECOND); 164 assertEquals(0, hbaseAdmin.listReplicationPeers().size()); 165 } 166 167 @Test 168 public void testAddPeerWithState() throws Exception { 169 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 170 rpc1.setClusterKey(KEY_ONE); 171 hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true); 172 assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled()); 173 hbaseAdmin.removeReplicationPeer(ID_ONE); 174 175 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 176 rpc2.setClusterKey(KEY_SECOND); 177 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false); 178 assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled()); 179 hbaseAdmin.removeReplicationPeer(ID_SECOND); 180 } 181 182 /** 183 * Tests that the peer configuration used by ReplicationAdmin contains all 184 * the peer's properties. 185 */ 186 @Test 187 public void testPeerConfig() throws Exception { 188 ReplicationPeerConfig config = new ReplicationPeerConfig(); 189 config.setClusterKey(KEY_ONE); 190 config.getConfiguration().put("key1", "value1"); 191 config.getConfiguration().put("key2", "value2"); 192 hbaseAdmin.addReplicationPeer(ID_ONE, config); 193 194 List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers(); 195 assertEquals(1, peers.size()); 196 ReplicationPeerDescription peerOne = peers.get(0); 197 assertNotNull(peerOne); 198 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 199 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 200 201 hbaseAdmin.removeReplicationPeer(ID_ONE); 202 } 203 204 @Test 205 public void testAddPeerWithUnDeletedQueues() throws Exception { 206 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 207 rpc1.setClusterKey(KEY_ONE); 208 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 209 rpc2.setClusterKey(KEY_SECOND); 210 Configuration conf = TEST_UTIL.getConfiguration(); 211 ZKWatcher zkw = new ZKWatcher(conf, "Test HBaseAdmin", null); 212 ReplicationQueues repQueues = 213 ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw)); 214 repQueues.init("server1"); 215 216 // add queue for ID_ONE 217 repQueues.addLog(ID_ONE, "file1"); 218 try { 219 admin.addPeer(ID_ONE, rpc1, null); 220 fail(); 221 } catch (Exception e) { 222 // OK! 223 } 224 repQueues.removeQueue(ID_ONE); 225 assertEquals(0, repQueues.getAllQueues().size()); 226 227 // add recovered queue for ID_ONE 228 repQueues.addLog(ID_ONE + "-server2", "file1"); 229 try { 230 admin.addPeer(ID_ONE, rpc2, null); 231 fail(); 232 } catch (Exception e) { 233 // OK! 234 } 235 repQueues.removeAllQueues(); 236 zkw.close(); 237 } 238 239 /** 240 * basic checks that when we add a peer that it is enabled, and that we can disable 241 * @throws Exception 242 */ 243 @Test 244 public void testEnableDisable() throws Exception { 245 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 246 rpc1.setClusterKey(KEY_ONE); 247 admin.addPeer(ID_ONE, rpc1, null); 248 assertEquals(1, admin.getPeersCount()); 249 assertTrue(admin.getPeerState(ID_ONE)); 250 admin.disablePeer(ID_ONE); 251 252 assertFalse(admin.getPeerState(ID_ONE)); 253 try { 254 admin.getPeerState(ID_SECOND); 255 } catch (ReplicationPeerNotFoundException e) { 256 // OK! 257 } 258 admin.removePeer(ID_ONE); 259 } 260 261 @Test 262 public void testAppendPeerTableCFs() throws Exception { 263 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 264 rpc.setClusterKey(KEY_ONE); 265 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 266 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 267 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 268 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 269 final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5"); 270 final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6"); 271 272 // Add a valid peer 273 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 274 275 // Update peer config, not replicate all user tables 276 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 277 rpc.setReplicateAllUserTables(false); 278 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 279 280 Map<TableName, List<String>> tableCFs = new HashMap<>(); 281 tableCFs.put(tableName1, null); 282 admin.appendPeerTableCFs(ID_ONE, tableCFs); 283 Map<TableName, List<String>> result = 284 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 285 assertEquals(1, result.size()); 286 assertEquals(true, result.containsKey(tableName1)); 287 assertNull(result.get(tableName1)); 288 289 // append table t2 to replication 290 tableCFs.clear(); 291 tableCFs.put(tableName2, null); 292 admin.appendPeerTableCFs(ID_ONE, tableCFs); 293 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 294 assertEquals(2, result.size()); 295 assertTrue("Should contain t1", result.containsKey(tableName1)); 296 assertTrue("Should contain t2", result.containsKey(tableName2)); 297 assertNull(result.get(tableName1)); 298 assertNull(result.get(tableName2)); 299 300 // append table column family: f1 of t3 to replication 301 tableCFs.clear(); 302 tableCFs.put(tableName3, new ArrayList<>()); 303 tableCFs.get(tableName3).add("f1"); 304 admin.appendPeerTableCFs(ID_ONE, tableCFs); 305 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 306 assertEquals(3, result.size()); 307 assertTrue("Should contain t1", result.containsKey(tableName1)); 308 assertTrue("Should contain t2", result.containsKey(tableName2)); 309 assertTrue("Should contain t3", result.containsKey(tableName3)); 310 assertNull(result.get(tableName1)); 311 assertNull(result.get(tableName2)); 312 assertEquals(1, result.get(tableName3).size()); 313 assertEquals("f1", result.get(tableName3).get(0)); 314 315 tableCFs.clear(); 316 tableCFs.put(tableName4, new ArrayList<>()); 317 tableCFs.get(tableName4).add("f1"); 318 tableCFs.get(tableName4).add("f2"); 319 admin.appendPeerTableCFs(ID_ONE, tableCFs); 320 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 321 assertEquals(4, result.size()); 322 assertTrue("Should contain t1", result.containsKey(tableName1)); 323 assertTrue("Should contain t2", result.containsKey(tableName2)); 324 assertTrue("Should contain t3", result.containsKey(tableName3)); 325 assertTrue("Should contain t4", result.containsKey(tableName4)); 326 assertNull(result.get(tableName1)); 327 assertNull(result.get(tableName2)); 328 assertEquals(1, result.get(tableName3).size()); 329 assertEquals("f1", result.get(tableName3).get(0)); 330 assertEquals(2, result.get(tableName4).size()); 331 assertEquals("f1", result.get(tableName4).get(0)); 332 assertEquals("f2", result.get(tableName4).get(1)); 333 334 // append "table5" => [], then append "table5" => ["f1"] 335 tableCFs.clear(); 336 tableCFs.put(tableName5, new ArrayList<>()); 337 admin.appendPeerTableCFs(ID_ONE, tableCFs); 338 tableCFs.clear(); 339 tableCFs.put(tableName5, new ArrayList<>()); 340 tableCFs.get(tableName5).add("f1"); 341 admin.appendPeerTableCFs(ID_ONE, tableCFs); 342 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 343 assertEquals(5, result.size()); 344 assertTrue("Should contain t5", result.containsKey(tableName5)); 345 // null means replication all cfs of tab5 346 assertNull(result.get(tableName5)); 347 348 // append "table6" => ["f1"], then append "table6" => [] 349 tableCFs.clear(); 350 tableCFs.put(tableName6, new ArrayList<>()); 351 tableCFs.get(tableName6).add("f1"); 352 admin.appendPeerTableCFs(ID_ONE, tableCFs); 353 tableCFs.clear(); 354 tableCFs.put(tableName6, new ArrayList<>()); 355 admin.appendPeerTableCFs(ID_ONE, tableCFs); 356 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 357 assertEquals(6, result.size()); 358 assertTrue("Should contain t6", result.containsKey(tableName6)); 359 // null means replication all cfs of tab6 360 assertNull(result.get(tableName6)); 361 362 admin.removePeer(ID_ONE); 363 } 364 365 @Test 366 public void testRemovePeerTableCFs() throws Exception { 367 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 368 rpc.setClusterKey(KEY_ONE); 369 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1"); 370 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2"); 371 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3"); 372 final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4"); 373 374 // Add a valid peer 375 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 376 377 // Update peer config, not replicate all user tables 378 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 379 rpc.setReplicateAllUserTables(false); 380 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 381 382 Map<TableName, List<String>> tableCFs = new HashMap<>(); 383 try { 384 tableCFs.put(tableName3, null); 385 admin.removePeerTableCFs(ID_ONE, tableCFs); 386 assertTrue(false); 387 } catch (ReplicationException e) { 388 } 389 assertNull(admin.getPeerTableCFs(ID_ONE)); 390 391 tableCFs.clear(); 392 tableCFs.put(tableName1, null); 393 tableCFs.put(tableName2, new ArrayList<>()); 394 tableCFs.get(tableName2).add("cf1"); 395 admin.setPeerTableCFs(ID_ONE, tableCFs); 396 try { 397 tableCFs.clear(); 398 tableCFs.put(tableName3, null); 399 admin.removePeerTableCFs(ID_ONE, tableCFs); 400 assertTrue(false); 401 } catch (ReplicationException e) { 402 } 403 Map<TableName, List<String>> result = 404 ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 405 assertEquals(2, result.size()); 406 assertTrue("Should contain t1", result.containsKey(tableName1)); 407 assertTrue("Should contain t2", result.containsKey(tableName2)); 408 assertNull(result.get(tableName1)); 409 assertEquals(1, result.get(tableName2).size()); 410 assertEquals("cf1", result.get(tableName2).get(0)); 411 412 try { 413 tableCFs.clear(); 414 tableCFs.put(tableName1, new ArrayList<>()); 415 tableCFs.get(tableName1).add("f1"); 416 admin.removePeerTableCFs(ID_ONE, tableCFs); 417 assertTrue(false); 418 } catch (ReplicationException e) { 419 } 420 tableCFs.clear(); 421 tableCFs.put(tableName1, null); 422 admin.removePeerTableCFs(ID_ONE, tableCFs); 423 result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); 424 assertEquals(1, result.size()); 425 assertEquals(1, result.get(tableName2).size()); 426 assertEquals("cf1", result.get(tableName2).get(0)); 427 428 try { 429 tableCFs.clear(); 430 tableCFs.put(tableName2, null); 431 admin.removePeerTableCFs(ID_ONE, tableCFs); 432 assertTrue(false); 433 } catch (ReplicationException e) { 434 } 435 tableCFs.clear(); 436 tableCFs.put(tableName2, new ArrayList<>()); 437 tableCFs.get(tableName2).add("cf1"); 438 admin.removePeerTableCFs(ID_ONE, tableCFs); 439 assertNull(admin.getPeerTableCFs(ID_ONE)); 440 441 tableCFs.clear(); 442 tableCFs.put(tableName4, new ArrayList<>()); 443 admin.setPeerTableCFs(ID_ONE, tableCFs); 444 admin.removePeerTableCFs(ID_ONE, tableCFs); 445 assertNull(admin.getPeerTableCFs(ID_ONE)); 446 447 admin.removePeer(ID_ONE); 448 } 449 450 @Test 451 public void testSetPeerNamespaces() throws Exception { 452 String ns1 = "ns1"; 453 String ns2 = "ns2"; 454 455 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 456 rpc.setClusterKey(KEY_ONE); 457 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 458 459 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 460 rpc.setReplicateAllUserTables(false); 461 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 462 463 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 464 Set<String> namespaces = new HashSet<>(); 465 namespaces.add(ns1); 466 namespaces.add(ns2); 467 rpc.setNamespaces(namespaces); 468 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 469 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 470 assertEquals(2, namespaces.size()); 471 assertTrue(namespaces.contains(ns1)); 472 assertTrue(namespaces.contains(ns2)); 473 474 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 475 namespaces = new HashSet<>(); 476 namespaces.add(ns1); 477 rpc.setNamespaces(namespaces); 478 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 479 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces(); 480 assertEquals(1, namespaces.size()); 481 assertTrue(namespaces.contains(ns1)); 482 483 hbaseAdmin.removeReplicationPeer(ID_ONE); 484 } 485 486 @Test 487 public void testSetReplicateAllUserTables() throws Exception { 488 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 489 rpc.setClusterKey(KEY_ONE); 490 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 491 492 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 493 assertTrue(rpc.replicateAllUserTables()); 494 495 rpc.setReplicateAllUserTables(false); 496 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 497 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 498 assertFalse(rpc.replicateAllUserTables()); 499 500 rpc.setReplicateAllUserTables(true); 501 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 502 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 503 assertTrue(rpc.replicateAllUserTables()); 504 505 hbaseAdmin.removeReplicationPeer(ID_ONE); 506 } 507 508 @Test 509 public void testPeerExcludeNamespaces() throws Exception { 510 String ns1 = "ns1"; 511 String ns2 = "ns2"; 512 513 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 514 rpc.setClusterKey(KEY_ONE); 515 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 516 517 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 518 assertTrue(rpc.replicateAllUserTables()); 519 520 Set<String> namespaces = new HashSet<String>(); 521 namespaces.add(ns1); 522 namespaces.add(ns2); 523 rpc.setExcludeNamespaces(namespaces); 524 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 525 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 526 assertEquals(2, namespaces.size()); 527 assertTrue(namespaces.contains(ns1)); 528 assertTrue(namespaces.contains(ns2)); 529 530 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 531 namespaces = new HashSet<String>(); 532 namespaces.add(ns1); 533 rpc.setExcludeNamespaces(namespaces); 534 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 535 namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces(); 536 assertEquals(1, namespaces.size()); 537 assertTrue(namespaces.contains(ns1)); 538 539 hbaseAdmin.removeReplicationPeer(ID_ONE); 540 } 541 542 @Test 543 public void testPeerExcludeTableCFs() throws Exception { 544 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 545 rpc.setClusterKey(KEY_ONE); 546 TableName tab1 = TableName.valueOf("t1"); 547 TableName tab2 = TableName.valueOf("t2"); 548 TableName tab3 = TableName.valueOf("t3"); 549 TableName tab4 = TableName.valueOf("t4"); 550 551 // Add a valid peer 552 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 553 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 554 assertTrue(rpc.replicateAllUserTables()); 555 556 Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>(); 557 tableCFs.put(tab1, null); 558 rpc.setExcludeTableCFsMap(tableCFs); 559 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 560 Map<TableName, List<String>> result = 561 hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 562 assertEquals(1, result.size()); 563 assertEquals(true, result.containsKey(tab1)); 564 assertNull(result.get(tab1)); 565 566 tableCFs.put(tab2, new ArrayList<String>()); 567 tableCFs.get(tab2).add("f1"); 568 rpc.setExcludeTableCFsMap(tableCFs); 569 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 570 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 571 assertEquals(2, result.size()); 572 assertTrue("Should contain t1", result.containsKey(tab1)); 573 assertTrue("Should contain t2", result.containsKey(tab2)); 574 assertNull(result.get(tab1)); 575 assertEquals(1, result.get(tab2).size()); 576 assertEquals("f1", result.get(tab2).get(0)); 577 578 tableCFs.clear(); 579 tableCFs.put(tab3, new ArrayList<String>()); 580 tableCFs.put(tab4, new ArrayList<String>()); 581 tableCFs.get(tab4).add("f1"); 582 tableCFs.get(tab4).add("f2"); 583 rpc.setExcludeTableCFsMap(tableCFs); 584 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 585 result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap(); 586 assertEquals(2, result.size()); 587 assertTrue("Should contain t3", result.containsKey(tab3)); 588 assertTrue("Should contain t4", result.containsKey(tab4)); 589 assertNull(result.get(tab3)); 590 assertEquals(2, result.get(tab4).size()); 591 assertEquals("f1", result.get(tab4).get(0)); 592 assertEquals("f2", result.get(tab4).get(1)); 593 594 hbaseAdmin.removeReplicationPeer(ID_ONE); 595 } 596 597 @Test 598 public void testPeerConfigConflict() throws Exception { 599 // Default replicate_all flag is true 600 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 601 rpc.setClusterKey(KEY_ONE); 602 603 String ns1 = "ns1"; 604 Set<String> namespaces = new HashSet<String>(); 605 namespaces.add(ns1); 606 607 TableName tab1 = TableName.valueOf("ns2:tabl"); 608 Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>(); 609 tableCfs.put(tab1, new ArrayList<String>()); 610 611 try { 612 rpc.setNamespaces(namespaces); 613 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 614 fail("Should throw Exception." 615 + " When replicate all flag is true, no need to config namespaces"); 616 } catch (IOException e) { 617 // OK 618 rpc.setNamespaces(null); 619 } 620 621 try { 622 rpc.setTableCFsMap(tableCfs); 623 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 624 fail("Should throw Exception." 625 + " When replicate all flag is true, no need to config table-cfs"); 626 } catch (IOException e) { 627 // OK 628 rpc.setTableCFsMap(null); 629 } 630 631 // Set replicate_all flag to true 632 rpc.setReplicateAllUserTables(false); 633 try { 634 rpc.setExcludeNamespaces(namespaces); 635 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 636 fail("Should throw Exception." 637 + " When replicate all flag is false, no need to config exclude namespaces"); 638 } catch (IOException e) { 639 // OK 640 rpc.setExcludeNamespaces(null); 641 } 642 643 try { 644 rpc.setExcludeTableCFsMap(tableCfs); 645 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 646 fail("Should throw Exception." 647 + " When replicate all flag is false, no need to config exclude table-cfs"); 648 } catch (IOException e) { 649 // OK 650 rpc.setExcludeTableCFsMap(null); 651 } 652 653 rpc.setNamespaces(namespaces); 654 rpc.setTableCFsMap(tableCfs); 655 // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config 656 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 657 658 // Default replicate_all flag is true 659 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 660 rpc2.setClusterKey(KEY_SECOND); 661 rpc2.setExcludeNamespaces(namespaces); 662 rpc2.setExcludeTableCFsMap(tableCfs); 663 // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude 664 // table-cfs config 665 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 666 667 hbaseAdmin.removeReplicationPeer(ID_ONE); 668 hbaseAdmin.removeReplicationPeer(ID_SECOND); 669 } 670 671 @Test 672 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 673 String ns1 = "ns1"; 674 String ns2 = "ns2"; 675 final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName()); 676 final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2"); 677 678 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 679 rpc.setClusterKey(KEY_ONE); 680 rpc.setReplicateAllUserTables(false); 681 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 682 683 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 684 Set<String> namespaces = new HashSet<String>(); 685 namespaces.add(ns1); 686 rpc.setNamespaces(namespaces); 687 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 688 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 689 try { 690 Map<TableName, List<String>> tableCfs = new HashMap<>(); 691 tableCfs.put(tableName1, new ArrayList<>()); 692 rpc.setTableCFsMap(tableCfs); 693 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 694 fail("Should throw ReplicationException" + " Because table " + tableName1 695 + " conflict with namespace " + ns1); 696 } catch (Exception e) { 697 // OK 698 } 699 700 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 701 Map<TableName, List<String>> tableCfs = new HashMap<>(); 702 tableCfs.put(tableName2, new ArrayList<>()); 703 rpc.setTableCFsMap(tableCfs); 704 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 705 rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); 706 try { 707 namespaces.clear(); 708 namespaces.add(ns2); 709 rpc.setNamespaces(namespaces); 710 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); 711 fail("Should throw ReplicationException" + " Because namespace " + ns2 712 + " conflict with table " + tableName2); 713 } catch (Exception e) { 714 // OK 715 } 716 717 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 718 rpc2.setClusterKey(KEY_SECOND); 719 hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2); 720 721 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 722 Set<String> excludeNamespaces = new HashSet<String>(); 723 excludeNamespaces.add(ns1); 724 rpc2.setExcludeNamespaces(excludeNamespaces); 725 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 726 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 727 try { 728 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 729 excludeTableCfs.put(tableName1, new ArrayList<>()); 730 rpc2.setExcludeTableCFsMap(excludeTableCfs); 731 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 732 fail("Should throw ReplicationException" + " Because exclude table " + tableName1 733 + " conflict with exclude namespace " + ns1); 734 } catch (Exception e) { 735 // OK 736 } 737 738 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 739 Map<TableName, List<String>> excludeTableCfs = new HashMap<>(); 740 excludeTableCfs.put(tableName2, new ArrayList<>()); 741 rpc2.setExcludeTableCFsMap(excludeTableCfs); 742 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 743 rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND); 744 try { 745 namespaces.clear(); 746 namespaces.add(ns2); 747 rpc2.setNamespaces(namespaces); 748 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2); 749 fail("Should throw ReplicationException" + " Because exclude namespace " + ns2 750 + " conflict with exclude table " + tableName2); 751 } catch (Exception e) { 752 // OK 753 } 754 755 hbaseAdmin.removeReplicationPeer(ID_ONE); 756 hbaseAdmin.removeReplicationPeer(ID_SECOND); 757 } 758 759 @Test 760 public void testPeerBandwidth() throws Exception { 761 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 762 rpc.setClusterKey(KEY_ONE); 763 hbaseAdmin.addReplicationPeer(ID_ONE, rpc); 764 765 rpc = admin.getPeerConfig(ID_ONE); 766 assertEquals(0, rpc.getBandwidth()); 767 768 rpc.setBandwidth(2097152); 769 admin.updatePeerConfig(ID_ONE, rpc); 770 771 assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); 772 admin.removePeer(ID_ONE); 773 } 774 775 @Test 776 public void testPeerClusterKey() throws Exception { 777 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 778 builder.setClusterKey(KEY_ONE); 779 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 780 781 try { 782 builder.setClusterKey(KEY_SECOND); 783 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 784 fail("Change cluster key on an existing peer is not allowed"); 785 } catch (Exception e) { 786 // OK 787 } 788 } 789 790 @Test 791 public void testPeerReplicationEndpointImpl() throws Exception { 792 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); 793 builder.setClusterKey(KEY_ONE); 794 builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); 795 hbaseAdmin.addReplicationPeer(ID_ONE, builder.build()); 796 797 try { 798 builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()); 799 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 800 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 801 } catch (Exception e) { 802 // OK 803 } 804 805 try { 806 builder = ReplicationPeerConfig.newBuilder(); 807 builder.setClusterKey(KEY_ONE); 808 hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build()); 809 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 810 } catch (Exception e) { 811 // OK 812 } 813 814 builder = ReplicationPeerConfig.newBuilder(); 815 builder.setClusterKey(KEY_SECOND); 816 hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build()); 817 818 try { 819 builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()); 820 hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build()); 821 fail("Change replication endpoint implementation class on an existing peer is not allowed"); 822 } catch (Exception e) { 823 // OK 824 } 825 } 826}