001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.CoreMatchers.startsWith; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertThat; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.UUID; 039import java.util.concurrent.CompletionException; 040import java.util.concurrent.ExecutionException; 041import org.apache.hadoop.hbase.DoNotRetryIOException; 042import org.apache.hadoop.hbase.HBaseClassTestRule; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; 047import org.apache.hadoop.hbase.replication.ReplicationException; 048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 052import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 053import org.apache.hadoop.hbase.testclassification.ClientTests; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.junit.After; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.runner.RunWith; 061import org.junit.runners.Parameterized; 062 063/** 064 * Class to test asynchronous replication admin operations. 065 */ 066@RunWith(Parameterized.class) 067@Category({ LargeTests.class, ClientTests.class }) 068public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); 073 074 private final String ID_ONE = "1"; 075 private final String KEY_ONE = "127.0.0.1:2181:/hbase"; 076 private final String ID_TWO = "2"; 077 private final String KEY_TWO = "127.0.0.1:2181:/hbase2"; 078 079 @BeforeClass 080 public static void setUpBeforeClass() throws Exception { 081 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 082 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 083 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 084 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 085 TEST_UTIL.startMiniCluster(); 086 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 087 } 088 089 @After 090 public void clearPeerAndQueues() throws IOException, ReplicationException { 091 try { 092 admin.removeReplicationPeer(ID_ONE).join(); 093 } catch (Exception e) { 094 } 095 try { 096 admin.removeReplicationPeer(ID_TWO).join(); 097 } catch (Exception e) { 098 } 099 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 100 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 101 for (ServerName serverName : queueStorage.getListOfReplicators()) { 102 for (String queue : queueStorage.getAllQueues(serverName)) { 103 queueStorage.removeQueue(serverName, queue); 104 } 105 } 106 } 107 108 @Test 109 public void testAddRemovePeer() throws Exception { 110 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 111 rpc1.setClusterKey(KEY_ONE); 112 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 113 rpc2.setClusterKey(KEY_TWO); 114 // Add a valid peer 115 admin.addReplicationPeer(ID_ONE, rpc1).join(); 116 // try adding the same (fails) 117 try { 118 admin.addReplicationPeer(ID_ONE, rpc1).join(); 119 fail("Test case should fail as adding a same peer."); 120 } catch (CompletionException e) { 121 // OK! 122 } 123 assertEquals(1, admin.listReplicationPeers().get().size()); 124 // Try to remove an inexisting peer 125 try { 126 admin.removeReplicationPeer(ID_TWO).join(); 127 fail("Test case should fail as removing a inexisting peer."); 128 } catch (CompletionException e) { 129 // OK! 130 } 131 assertEquals(1, admin.listReplicationPeers().get().size()); 132 // Add a second since multi-slave is supported 133 admin.addReplicationPeer(ID_TWO, rpc2).join(); 134 assertEquals(2, admin.listReplicationPeers().get().size()); 135 // Remove the first peer we added 136 admin.removeReplicationPeer(ID_ONE).join(); 137 assertEquals(1, admin.listReplicationPeers().get().size()); 138 admin.removeReplicationPeer(ID_TWO).join(); 139 assertEquals(0, admin.listReplicationPeers().get().size()); 140 } 141 142 @Test 143 public void testPeerConfig() throws Exception { 144 ReplicationPeerConfig config = new ReplicationPeerConfig(); 145 config.setClusterKey(KEY_ONE); 146 config.getConfiguration().put("key1", "value1"); 147 config.getConfiguration().put("key2", "value2"); 148 admin.addReplicationPeer(ID_ONE, config).join(); 149 150 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 151 assertEquals(1, peers.size()); 152 ReplicationPeerDescription peerOne = peers.get(0); 153 assertNotNull(peerOne); 154 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 155 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 156 157 admin.removeReplicationPeer(ID_ONE).join(); 158 } 159 160 @Test 161 public void testEnableDisablePeer() throws Exception { 162 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 163 rpc1.setClusterKey(KEY_ONE); 164 admin.addReplicationPeer(ID_ONE, rpc1).join(); 165 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 166 assertEquals(1, peers.size()); 167 assertTrue(peers.get(0).isEnabled()); 168 169 admin.disableReplicationPeer(ID_ONE).join(); 170 peers = admin.listReplicationPeers().get(); 171 assertEquals(1, peers.size()); 172 assertFalse(peers.get(0).isEnabled()); 173 admin.removeReplicationPeer(ID_ONE).join(); 174 } 175 176 @Test 177 public void testAppendPeerTableCFs() throws Exception { 178 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 179 rpc1.setClusterKey(KEY_ONE); 180 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 181 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 182 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 183 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 184 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5"); 185 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6"); 186 187 // Add a valid peer 188 admin.addReplicationPeer(ID_ONE, rpc1).join(); 189 rpc1.setReplicateAllUserTables(false); 190 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 191 192 Map<TableName, List<String>> tableCFs = new HashMap<>(); 193 194 // append table t1 to replication 195 tableCFs.put(tableName1, null); 196 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 197 Map<TableName, List<String>> result = 198 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 199 assertEquals(1, result.size()); 200 assertEquals(true, result.containsKey(tableName1)); 201 assertNull(result.get(tableName1)); 202 203 // append table t2 to replication 204 tableCFs.clear(); 205 tableCFs.put(tableName2, null); 206 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 207 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 208 assertEquals(2, result.size()); 209 assertTrue("Should contain t1", result.containsKey(tableName1)); 210 assertTrue("Should contain t2", result.containsKey(tableName2)); 211 assertNull(result.get(tableName1)); 212 assertNull(result.get(tableName2)); 213 214 // append table column family: f1 of t3 to replication 215 tableCFs.clear(); 216 tableCFs.put(tableName3, new ArrayList<>()); 217 tableCFs.get(tableName3).add("f1"); 218 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 219 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 220 assertEquals(3, result.size()); 221 assertTrue("Should contain t1", result.containsKey(tableName1)); 222 assertTrue("Should contain t2", result.containsKey(tableName2)); 223 assertTrue("Should contain t3", result.containsKey(tableName3)); 224 assertNull(result.get(tableName1)); 225 assertNull(result.get(tableName2)); 226 assertEquals(1, result.get(tableName3).size()); 227 assertEquals("f1", result.get(tableName3).get(0)); 228 229 // append table column family: f1,f2 of t4 to replication 230 tableCFs.clear(); 231 tableCFs.put(tableName4, new ArrayList<>()); 232 tableCFs.get(tableName4).add("f1"); 233 tableCFs.get(tableName4).add("f2"); 234 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 235 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 236 assertEquals(4, result.size()); 237 assertTrue("Should contain t1", result.containsKey(tableName1)); 238 assertTrue("Should contain t2", result.containsKey(tableName2)); 239 assertTrue("Should contain t3", result.containsKey(tableName3)); 240 assertTrue("Should contain t4", result.containsKey(tableName4)); 241 assertNull(result.get(tableName1)); 242 assertNull(result.get(tableName2)); 243 assertEquals(1, result.get(tableName3).size()); 244 assertEquals("f1", result.get(tableName3).get(0)); 245 assertEquals(2, result.get(tableName4).size()); 246 assertEquals("f1", result.get(tableName4).get(0)); 247 assertEquals("f2", result.get(tableName4).get(1)); 248 249 // append "table5" => [], then append "table5" => ["f1"] 250 tableCFs.clear(); 251 tableCFs.put(tableName5, new ArrayList<>()); 252 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 253 tableCFs.clear(); 254 tableCFs.put(tableName5, new ArrayList<>()); 255 tableCFs.get(tableName5).add("f1"); 256 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 257 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 258 assertEquals(5, result.size()); 259 assertTrue("Should contain t5", result.containsKey(tableName5)); 260 // null means replication all cfs of tab5 261 assertNull(result.get(tableName5)); 262 263 // append "table6" => ["f1"], then append "table6" => [] 264 tableCFs.clear(); 265 tableCFs.put(tableName6, new ArrayList<>()); 266 tableCFs.get(tableName6).add("f1"); 267 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 268 tableCFs.clear(); 269 tableCFs.put(tableName6, new ArrayList<>()); 270 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 271 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 272 assertEquals(6, result.size()); 273 assertTrue("Should contain t6", result.containsKey(tableName6)); 274 // null means replication all cfs of tab6 275 assertNull(result.get(tableName6)); 276 277 admin.removeReplicationPeer(ID_ONE).join(); 278 } 279 280 @Test 281 public void testRemovePeerTableCFs() throws Exception { 282 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 283 rpc1.setClusterKey(KEY_ONE); 284 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 285 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 286 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 287 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 288 // Add a valid peer 289 admin.addReplicationPeer(ID_ONE, rpc1).join(); 290 rpc1.setReplicateAllUserTables(false); 291 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 292 293 Map<TableName, List<String>> tableCFs = new HashMap<>(); 294 try { 295 tableCFs.put(tableName3, null); 296 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 297 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); 298 } catch (CompletionException e) { 299 assertTrue(e.getCause() instanceof ReplicationException); 300 } 301 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 302 303 tableCFs.clear(); 304 tableCFs.put(tableName1, null); 305 tableCFs.put(tableName2, new ArrayList<>()); 306 tableCFs.get(tableName2).add("cf1"); 307 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 308 try { 309 tableCFs.clear(); 310 tableCFs.put(tableName3, null); 311 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 312 fail("Test case should fail as removing table-cfs from a peer whose" + 313 " table-cfs didn't contain t3"); 314 } catch (CompletionException e) { 315 assertTrue(e.getCause() instanceof ReplicationException); 316 } 317 Map<TableName, List<String>> result = 318 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 319 assertEquals(2, result.size()); 320 assertTrue("Should contain t1", result.containsKey(tableName1)); 321 assertTrue("Should contain t2", result.containsKey(tableName2)); 322 assertNull(result.get(tableName1)); 323 assertEquals(1, result.get(tableName2).size()); 324 assertEquals("cf1", result.get(tableName2).get(0)); 325 326 try { 327 tableCFs.clear(); 328 tableCFs.put(tableName1, new ArrayList<>()); 329 tableCFs.get(tableName1).add("cf1"); 330 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 331 fail("Test case should fail, because table t1 didn't specify cfs in peer config"); 332 } catch (CompletionException e) { 333 assertTrue(e.getCause() instanceof ReplicationException); 334 } 335 tableCFs.clear(); 336 tableCFs.put(tableName1, null); 337 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 338 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 339 assertEquals(1, result.size()); 340 assertEquals(1, result.get(tableName2).size()); 341 assertEquals("cf1", result.get(tableName2).get(0)); 342 343 try { 344 tableCFs.clear(); 345 tableCFs.put(tableName2, null); 346 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 347 fail("Test case should fail, because table t2 hase specified cfs in peer config"); 348 } catch (CompletionException e) { 349 assertTrue(e.getCause() instanceof ReplicationException); 350 } 351 tableCFs.clear(); 352 tableCFs.put(tableName2, new ArrayList<>()); 353 tableCFs.get(tableName2).add("cf1"); 354 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 355 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 356 357 tableCFs.clear(); 358 tableCFs.put(tableName4, new ArrayList<>()); 359 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 360 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 361 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 362 363 admin.removeReplicationPeer(ID_ONE); 364 } 365 366 @Test 367 public void testSetPeerNamespaces() throws Exception { 368 String ns1 = "ns1"; 369 String ns2 = "ns2"; 370 371 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 372 rpc.setClusterKey(KEY_ONE); 373 admin.addReplicationPeer(ID_ONE, rpc).join(); 374 rpc.setReplicateAllUserTables(false); 375 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 376 377 // add ns1 and ns2 to peer config 378 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 379 Set<String> namespaces = new HashSet<>(); 380 namespaces.add(ns1); 381 namespaces.add(ns2); 382 rpc.setNamespaces(namespaces); 383 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 384 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 385 assertEquals(2, namespaces.size()); 386 assertTrue(namespaces.contains(ns1)); 387 assertTrue(namespaces.contains(ns2)); 388 389 // update peer config only contains ns1 390 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 391 namespaces = new HashSet<>(); 392 namespaces.add(ns1); 393 rpc.setNamespaces(namespaces); 394 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 395 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 396 assertEquals(1, namespaces.size()); 397 assertTrue(namespaces.contains(ns1)); 398 399 admin.removeReplicationPeer(ID_ONE).join(); 400 } 401 402 @Test 403 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 404 String ns1 = "ns1"; 405 String ns2 = "ns2"; 406 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1"); 407 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2"); 408 409 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 410 rpc.setClusterKey(KEY_ONE); 411 admin.addReplicationPeer(ID_ONE, rpc).join(); 412 rpc.setReplicateAllUserTables(false); 413 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 414 415 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 416 Set<String> namespaces = new HashSet<String>(); 417 namespaces.add(ns1); 418 rpc.setNamespaces(namespaces); 419 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 420 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 421 Map<TableName, List<String>> tableCfs = new HashMap<>(); 422 tableCfs.put(tableName1, new ArrayList<>()); 423 rpc.setTableCFsMap(tableCfs); 424 try { 425 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 426 fail( 427 "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); 428 } catch (CompletionException e) { 429 // OK 430 } 431 432 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 433 tableCfs.clear(); 434 tableCfs.put(tableName2, new ArrayList<>()); 435 rpc.setTableCFsMap(tableCfs); 436 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 437 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 438 namespaces.clear(); 439 namespaces.add(ns2); 440 rpc.setNamespaces(namespaces); 441 try { 442 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 443 fail( 444 "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); 445 } catch (CompletionException e) { 446 // OK 447 } 448 449 admin.removeReplicationPeer(ID_ONE).join(); 450 } 451 452 @Test 453 public void testPeerBandwidth() throws Exception { 454 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 455 rpc.setClusterKey(KEY_ONE); 456 457 admin.addReplicationPeer(ID_ONE, rpc).join(); 458 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 459 assertEquals(0, rpc.getBandwidth()); 460 461 rpc.setBandwidth(2097152); 462 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 463 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); 464 465 admin.removeReplicationPeer(ID_ONE).join(); 466 } 467 468 @Test 469 public void testInvalidClusterKey() throws InterruptedException { 470 try { 471 admin.addReplicationPeer(ID_ONE, 472 ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); 473 fail(); 474 } catch (ExecutionException e) { 475 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 476 } 477 } 478 479 @Test 480 public void testInvalidReplicationEndpoint() throws InterruptedException { 481 try { 482 admin.addReplicationPeer(ID_ONE, 483 ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); 484 fail(); 485 } catch (ExecutionException e) { 486 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 487 assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); 488 } 489 } 490 491 public static final class DummyReplicationEndpoint extends BaseReplicationEndpoint { 492 493 @Override 494 public UUID getPeerUUID() { 495 return ctx.getClusterId(); 496 } 497 498 @Override 499 public boolean replicate(ReplicateContext replicateContext) { 500 return true; 501 } 502 503 @Override 504 public void start() { 505 startAsync(); 506 } 507 508 @Override 509 public void stop() { 510 stopAsync(); 511 } 512 513 @Override 514 protected void doStart() { 515 notifyStarted(); 516 } 517 518 @Override 519 protected void doStop() { 520 notifyStopped(); 521 } 522 } 523 524 @Test 525 public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { 526 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint 527 admin.addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder() 528 .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build()).get(); 529 530 // but we still need to check cluster key if we specify the default ReplicationEndpoint 531 try { 532 admin 533 .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() 534 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) 535 .get(); 536 fail(); 537 } catch (ExecutionException e) { 538 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 539 } 540 } 541}