001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.hamcrest.Matchers.containsString; 023import static org.hamcrest.Matchers.instanceOf; 024import static org.hamcrest.Matchers.startsWith; 025import static org.junit.jupiter.api.Assertions.assertEquals; 026import static org.junit.jupiter.api.Assertions.assertFalse; 027import static org.junit.jupiter.api.Assertions.assertNotNull; 028import static org.junit.jupiter.api.Assertions.assertNull; 029import static org.junit.jupiter.api.Assertions.assertThrows; 030import static org.junit.jupiter.api.Assertions.assertTrue; 031import static org.junit.jupiter.api.Assertions.fail; 032 033import java.io.IOException; 034import java.util.ArrayList; 035import java.util.HashMap; 036import java.util.HashSet; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.concurrent.CompletionException; 041import java.util.concurrent.ExecutionException; 042import java.util.function.Supplier; 043import org.apache.hadoop.hbase.DoNotRetryIOException; 044import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.replication.ReplicationException; 049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 050import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 051import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 052import org.apache.hadoop.hbase.replication.ReplicationQueueData; 053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 055import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint; 056import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 057import org.apache.hadoop.hbase.testclassification.ClientTests; 058import org.apache.hadoop.hbase.testclassification.LargeTests; 059import org.junit.jupiter.api.AfterAll; 060import org.junit.jupiter.api.AfterEach; 061import org.junit.jupiter.api.BeforeAll; 062import org.junit.jupiter.api.Tag; 063import org.junit.jupiter.api.TestTemplate; 064 065/** 066 * Class to test asynchronous replication admin operations. 067 */ 068@Tag(LargeTests.TAG) 069@Tag(ClientTests.TAG) 070@HBaseParameterizedTestTemplate(name = "{index}: policy = {0}") 071public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { 072 073 private final String ID_ONE = "1"; 074 private static String KEY_ONE; 075 private final String ID_TWO = "2"; 076 private static String KEY_TWO; 077 078 public TestAsyncReplicationAdminApi(Supplier<AsyncAdmin> admin) { 079 super(admin); 080 } 081 082 @BeforeAll 083 public static void setUpBeforeClass() throws Exception { 084 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 085 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 086 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 087 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 088 TEST_UTIL.startMiniCluster(); 089 KEY_ONE = TEST_UTIL.getZkConnectionURI() + "-test1"; 090 KEY_TWO = TEST_UTIL.getZkConnectionURI() + "-test2"; 091 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 092 } 093 094 @AfterAll 095 public static void tearDownAfterClass() throws Exception { 096 TestAsyncAdminBase.tearDownAfterClass(); 097 } 098 099 @AfterEach 100 public void clearPeerAndQueues() throws IOException, ReplicationException { 101 try { 102 admin.removeReplicationPeer(ID_ONE).join(); 103 } catch (Exception e) { 104 } 105 try { 106 admin.removeReplicationPeer(ID_TWO).join(); 107 } catch (Exception e) { 108 } 109 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 110 .getReplicationQueueStorage(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration()); 111 for (ReplicationQueueData queueData : queueStorage.listAllQueues()) { 112 queueStorage.removeQueue(queueData.getId()); 113 } 114 admin.replicationPeerModificationSwitch(true).join(); 115 } 116 117 @TestTemplate 118 public void testAddRemovePeer() throws Exception { 119 ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); 120 ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(); 121 // Add a valid peer 122 admin.addReplicationPeer(ID_ONE, rpc1).join(); 123 // try adding the same (fails) 124 try { 125 admin.addReplicationPeer(ID_ONE, rpc1).join(); 126 fail("Test case should fail as adding a same peer."); 127 } catch (CompletionException e) { 128 // OK! 129 } 130 assertEquals(1, admin.listReplicationPeers().get().size()); 131 // Try to remove an inexisting peer 132 try { 133 admin.removeReplicationPeer(ID_TWO).join(); 134 fail("Test case should fail as removing a inexisting peer."); 135 } catch (CompletionException e) { 136 // OK! 137 } 138 assertEquals(1, admin.listReplicationPeers().get().size()); 139 // Add a second since multi-slave is supported 140 admin.addReplicationPeer(ID_TWO, rpc2).join(); 141 assertEquals(2, admin.listReplicationPeers().get().size()); 142 // Remove the first peer we added 143 admin.removeReplicationPeer(ID_ONE).join(); 144 assertEquals(1, admin.listReplicationPeers().get().size()); 145 admin.removeReplicationPeer(ID_TWO).join(); 146 assertEquals(0, admin.listReplicationPeers().get().size()); 147 } 148 149 @TestTemplate 150 public void testPeerConfig() throws Exception { 151 ReplicationPeerConfig config = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE) 152 .putConfiguration("key1", "value1").putConfiguration("key2", "value2").build(); 153 admin.addReplicationPeer(ID_ONE, config).join(); 154 155 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 156 assertEquals(1, peers.size()); 157 ReplicationPeerDescription peerOne = peers.get(0); 158 assertNotNull(peerOne); 159 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 160 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 161 162 admin.removeReplicationPeer(ID_ONE).join(); 163 } 164 165 @TestTemplate 166 public void testEnableDisablePeer() throws Exception { 167 ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); 168 admin.addReplicationPeer(ID_ONE, rpc1).join(); 169 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 170 assertEquals(1, peers.size()); 171 assertTrue(peers.get(0).isEnabled()); 172 173 admin.disableReplicationPeer(ID_ONE).join(); 174 peers = admin.listReplicationPeers().get(); 175 assertEquals(1, peers.size()); 176 assertFalse(peers.get(0).isEnabled()); 177 admin.removeReplicationPeer(ID_ONE).join(); 178 } 179 180 @TestTemplate 181 public void testAppendPeerTableCFs() throws Exception { 182 ReplicationPeerConfigBuilder rpcBuilder = 183 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 184 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 185 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 186 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 187 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 188 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5"); 189 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6"); 190 191 // Add a valid peer 192 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 193 rpcBuilder.setReplicateAllUserTables(false); 194 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 195 196 Map<TableName, List<String>> tableCFs = new HashMap<>(); 197 198 // append table t1 to replication 199 tableCFs.put(tableName1, null); 200 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 201 Map<TableName, List<String>> result = 202 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 203 assertEquals(1, result.size()); 204 assertEquals(true, result.containsKey(tableName1)); 205 assertNull(result.get(tableName1)); 206 207 // append table t2 to replication 208 tableCFs.clear(); 209 tableCFs.put(tableName2, null); 210 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 211 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 212 assertEquals(2, result.size()); 213 assertTrue(result.containsKey(tableName1), "Should contain t1"); 214 assertTrue(result.containsKey(tableName2), "Should contain t2"); 215 assertNull(result.get(tableName1)); 216 assertNull(result.get(tableName2)); 217 218 // append table column family: f1 of t3 to replication 219 tableCFs.clear(); 220 tableCFs.put(tableName3, new ArrayList<>()); 221 tableCFs.get(tableName3).add("f1"); 222 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 223 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 224 assertEquals(3, result.size()); 225 assertTrue(result.containsKey(tableName1), "Should contain t1"); 226 assertTrue(result.containsKey(tableName2), "Should contain t2"); 227 assertTrue(result.containsKey(tableName3), "Should contain t3"); 228 assertNull(result.get(tableName1)); 229 assertNull(result.get(tableName2)); 230 assertEquals(1, result.get(tableName3).size()); 231 assertEquals("f1", result.get(tableName3).get(0)); 232 233 // append table column family: f1,f2 of t4 to replication 234 tableCFs.clear(); 235 tableCFs.put(tableName4, new ArrayList<>()); 236 tableCFs.get(tableName4).add("f1"); 237 tableCFs.get(tableName4).add("f2"); 238 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 239 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 240 assertEquals(4, result.size()); 241 assertTrue(result.containsKey(tableName1), "Should contain t1"); 242 assertTrue(result.containsKey(tableName2), "Should contain t2"); 243 assertTrue(result.containsKey(tableName3), "Should contain t3"); 244 assertTrue(result.containsKey(tableName4), "Should contain t4"); 245 assertNull(result.get(tableName1)); 246 assertNull(result.get(tableName2)); 247 assertEquals(1, result.get(tableName3).size()); 248 assertEquals("f1", result.get(tableName3).get(0)); 249 assertEquals(2, result.get(tableName4).size()); 250 assertEquals("f1", result.get(tableName4).get(0)); 251 assertEquals("f2", result.get(tableName4).get(1)); 252 253 // append "table5" => [], then append "table5" => ["f1"] 254 tableCFs.clear(); 255 tableCFs.put(tableName5, new ArrayList<>()); 256 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 257 tableCFs.clear(); 258 tableCFs.put(tableName5, new ArrayList<>()); 259 tableCFs.get(tableName5).add("f1"); 260 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 261 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 262 assertEquals(5, result.size()); 263 assertTrue(result.containsKey(tableName5), "Should contain t5"); 264 // null means replication all cfs of tab5 265 assertNull(result.get(tableName5)); 266 267 // append "table6" => ["f1"], then append "table6" => [] 268 tableCFs.clear(); 269 tableCFs.put(tableName6, new ArrayList<>()); 270 tableCFs.get(tableName6).add("f1"); 271 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 272 tableCFs.clear(); 273 tableCFs.put(tableName6, new ArrayList<>()); 274 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 275 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 276 assertEquals(6, result.size()); 277 assertTrue(result.containsKey(tableName6), "Should contain t6"); 278 // null means replication all cfs of tab6 279 assertNull(result.get(tableName6)); 280 281 admin.removeReplicationPeer(ID_ONE).join(); 282 } 283 284 @TestTemplate 285 public void testRemovePeerTableCFs() throws Exception { 286 ReplicationPeerConfigBuilder rpcBuilder = 287 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 288 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 289 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 290 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 291 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 292 // Add a valid peer 293 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 294 rpcBuilder.setReplicateAllUserTables(false); 295 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 296 297 Map<TableName, List<String>> tableCFs = new HashMap<>(); 298 try { 299 tableCFs.put(tableName3, null); 300 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 301 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); 302 } catch (CompletionException e) { 303 assertTrue(e.getCause() instanceof ReplicationException); 304 } 305 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 306 307 tableCFs.clear(); 308 tableCFs.put(tableName1, null); 309 tableCFs.put(tableName2, new ArrayList<>()); 310 tableCFs.get(tableName2).add("cf1"); 311 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 312 try { 313 tableCFs.clear(); 314 tableCFs.put(tableName3, null); 315 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 316 fail("Test case should fail as removing table-cfs from a peer whose" 317 + " table-cfs didn't contain t3"); 318 } catch (CompletionException e) { 319 assertTrue(e.getCause() instanceof ReplicationException); 320 } 321 Map<TableName, List<String>> result = 322 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 323 assertEquals(2, result.size()); 324 assertTrue(result.containsKey(tableName1), "Should contain t1"); 325 assertTrue(result.containsKey(tableName2), "Should contain t2"); 326 assertNull(result.get(tableName1)); 327 assertEquals(1, result.get(tableName2).size()); 328 assertEquals("cf1", result.get(tableName2).get(0)); 329 330 try { 331 tableCFs.clear(); 332 tableCFs.put(tableName1, new ArrayList<>()); 333 tableCFs.get(tableName1).add("cf1"); 334 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 335 fail("Test case should fail, because table t1 didn't specify cfs in peer config"); 336 } catch (CompletionException e) { 337 assertTrue(e.getCause() instanceof ReplicationException); 338 } 339 tableCFs.clear(); 340 tableCFs.put(tableName1, null); 341 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 342 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 343 assertEquals(1, result.size()); 344 assertEquals(1, result.get(tableName2).size()); 345 assertEquals("cf1", result.get(tableName2).get(0)); 346 347 try { 348 tableCFs.clear(); 349 tableCFs.put(tableName2, null); 350 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 351 fail("Test case should fail, because table t2 hase specified cfs in peer config"); 352 } catch (CompletionException e) { 353 assertTrue(e.getCause() instanceof ReplicationException); 354 } 355 tableCFs.clear(); 356 tableCFs.put(tableName2, new ArrayList<>()); 357 tableCFs.get(tableName2).add("cf1"); 358 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 359 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 360 361 tableCFs.clear(); 362 tableCFs.put(tableName4, new ArrayList<>()); 363 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 364 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 365 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 366 367 admin.removeReplicationPeer(ID_ONE); 368 } 369 370 @TestTemplate 371 public void testSetPeerNamespaces() throws Exception { 372 String ns1 = "ns1"; 373 String ns2 = "ns2"; 374 375 ReplicationPeerConfigBuilder rpcBuilder = 376 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 377 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 378 rpcBuilder.setReplicateAllUserTables(false); 379 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 380 381 // add ns1 and ns2 to peer config 382 Set<String> namespaces = new HashSet<>(); 383 namespaces.add(ns1); 384 namespaces.add(ns2); 385 rpcBuilder.setNamespaces(namespaces); 386 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 387 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 388 assertEquals(2, namespaces.size()); 389 assertTrue(namespaces.contains(ns1)); 390 assertTrue(namespaces.contains(ns2)); 391 392 // update peer config only contains ns1 393 namespaces = new HashSet<>(); 394 namespaces.add(ns1); 395 rpcBuilder.setNamespaces(namespaces); 396 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 397 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 398 assertEquals(1, namespaces.size()); 399 assertTrue(namespaces.contains(ns1)); 400 401 admin.removeReplicationPeer(ID_ONE).join(); 402 } 403 404 @TestTemplate 405 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 406 String ns1 = "ns1"; 407 String ns2 = "ns2"; 408 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1"); 409 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2"); 410 411 ReplicationPeerConfigBuilder rpcBuilder = 412 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 413 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 414 rpcBuilder.setReplicateAllUserTables(false); 415 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 416 417 Set<String> namespaces = new HashSet<String>(); 418 namespaces.add(ns1); 419 rpcBuilder.setNamespaces(namespaces); 420 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get(); 421 Map<TableName, List<String>> tableCfs = new HashMap<>(); 422 tableCfs.put(tableName1, new ArrayList<>()); 423 rpcBuilder.setTableCFsMap(tableCfs); 424 try { 425 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 426 fail( 427 "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); 428 } catch (CompletionException e) { 429 // OK 430 } 431 432 tableCfs.clear(); 433 tableCfs.put(tableName2, new ArrayList<>()); 434 rpcBuilder.setTableCFsMap(tableCfs); 435 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get(); 436 namespaces.clear(); 437 namespaces.add(ns2); 438 rpcBuilder.setNamespaces(namespaces); 439 try { 440 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 441 fail( 442 "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); 443 } catch (CompletionException e) { 444 // OK 445 } 446 447 admin.removeReplicationPeer(ID_ONE).join(); 448 } 449 450 @TestTemplate 451 public void testPeerBandwidth() throws Exception { 452 ReplicationPeerConfigBuilder rpcBuilder = 453 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 454 455 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 456 ; 457 assertEquals(0, admin.getReplicationPeerConfig(ID_ONE).get().getBandwidth()); 458 459 rpcBuilder.setBandwidth(2097152); 460 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 461 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); 462 463 admin.removeReplicationPeer(ID_ONE).join(); 464 } 465 466 @TestTemplate 467 public void testInvalidClusterKey() throws InterruptedException { 468 try { 469 admin.addReplicationPeer(ID_ONE, 470 ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); 471 fail(); 472 } catch (ExecutionException e) { 473 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 474 } 475 } 476 477 @TestTemplate 478 public void testClusterKeyWithTrailingSpace() throws Exception { 479 admin.addReplicationPeer(ID_ONE, 480 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get(); 481 String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey(); 482 assertEquals(KEY_ONE, clusterKey); 483 } 484 485 @TestTemplate 486 public void testInvalidReplicationEndpoint() throws InterruptedException { 487 try { 488 admin.addReplicationPeer(ID_ONE, 489 ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); 490 fail(); 491 } catch (ExecutionException e) { 492 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 493 assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); 494 } 495 } 496 497 @TestTemplate 498 public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { 499 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint 500 admin 501 .addReplicationPeer(ID_ONE, 502 ReplicationPeerConfig.newBuilder() 503 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build()) 504 .get(); 505 506 // but we still need to check cluster key if we specify the default ReplicationEndpoint 507 try { 508 admin 509 .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() 510 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) 511 .get(); 512 fail(); 513 } catch (ExecutionException e) { 514 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 515 } 516 } 517 518 /** 519 * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist. 520 */ 521 @TestTemplate 522 public void testReplicationPeerNotFoundException() throws InterruptedException { 523 String dummyPeer = "dummy_peer"; 524 try { 525 admin.removeReplicationPeer(dummyPeer).get(); 526 fail(); 527 } catch (ExecutionException e) { 528 assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class)); 529 } 530 } 531 532 @TestTemplate 533 public void testReplicationPeerModificationSwitch() throws Exception { 534 assertTrue(admin.isReplicationPeerModificationEnabled().get()); 535 // disable modification, should returns true as it is enabled by default and the above 536 // assertion has confirmed it 537 assertTrue(admin.replicationPeerModificationSwitch(false).get()); 538 ExecutionException error = assertThrows(ExecutionException.class, () -> admin 539 .addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()) 540 .get()); 541 assertThat(error.getCause().getMessage(), 542 containsString("Replication peer modification disabled")); 543 // enable again, and the previous value should be false 544 assertFalse(admin.replicationPeerModificationSwitch(true).get()); 545 } 546}