001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.CoreMatchers.startsWith; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CompletionException; 039import java.util.concurrent.ExecutionException; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.replication.ReplicationException; 047import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 048import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; 049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 052import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint; 053import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 054import org.apache.hadoop.hbase.testclassification.ClientTests; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.junit.After; 057import org.junit.BeforeClass; 058import org.junit.ClassRule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.runner.RunWith; 062import org.junit.runners.Parameterized; 063 064/** 065 * Class to test asynchronous replication admin operations. 066 */ 067@RunWith(Parameterized.class) 068@Category({ LargeTests.class, ClientTests.class }) 069public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { 070 071 @ClassRule 072 public static final HBaseClassTestRule CLASS_RULE = 073 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); 074 075 private final String ID_ONE = "1"; 076 private static String KEY_ONE; 077 private final String ID_TWO = "2"; 078 private static String KEY_TWO; 079 080 @BeforeClass 081 public static void setUpBeforeClass() throws Exception { 082 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 083 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 084 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 085 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 086 TEST_UTIL.startMiniCluster(); 087 KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; 088 KEY_TWO = TEST_UTIL.getClusterKey() + "-test2"; 089 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 090 } 091 092 @After 093 public void clearPeerAndQueues() throws IOException, ReplicationException { 094 try { 095 admin.removeReplicationPeer(ID_ONE).join(); 096 } catch (Exception e) { 097 } 098 try { 099 admin.removeReplicationPeer(ID_TWO).join(); 100 } catch (Exception e) { 101 } 102 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 103 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 104 for (ServerName serverName : queueStorage.getListOfReplicators()) { 105 for (String queue : queueStorage.getAllQueues(serverName)) { 106 queueStorage.removeQueue(serverName, queue); 107 } 108 } 109 } 110 111 @Test 112 public void testAddRemovePeer() throws Exception { 113 ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); 114 ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(); 115 // Add a valid peer 116 admin.addReplicationPeer(ID_ONE, rpc1).join(); 117 // try adding the same (fails) 118 try { 119 admin.addReplicationPeer(ID_ONE, rpc1).join(); 120 fail("Test case should fail as adding a same peer."); 121 } catch (CompletionException e) { 122 // OK! 123 } 124 assertEquals(1, admin.listReplicationPeers().get().size()); 125 // Try to remove an inexisting peer 126 try { 127 admin.removeReplicationPeer(ID_TWO).join(); 128 fail("Test case should fail as removing a inexisting peer."); 129 } catch (CompletionException e) { 130 // OK! 131 } 132 assertEquals(1, admin.listReplicationPeers().get().size()); 133 // Add a second since multi-slave is supported 134 admin.addReplicationPeer(ID_TWO, rpc2).join(); 135 assertEquals(2, admin.listReplicationPeers().get().size()); 136 // Remove the first peer we added 137 admin.removeReplicationPeer(ID_ONE).join(); 138 assertEquals(1, admin.listReplicationPeers().get().size()); 139 admin.removeReplicationPeer(ID_TWO).join(); 140 assertEquals(0, admin.listReplicationPeers().get().size()); 141 } 142 143 @Test 144 public void testPeerConfig() throws Exception { 145 ReplicationPeerConfig config = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE) 146 .putConfiguration("key1", "value1").putConfiguration("key2", "value2").build(); 147 admin.addReplicationPeer(ID_ONE, config).join(); 148 149 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 150 assertEquals(1, peers.size()); 151 ReplicationPeerDescription peerOne = peers.get(0); 152 assertNotNull(peerOne); 153 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 154 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 155 156 admin.removeReplicationPeer(ID_ONE).join(); 157 } 158 159 @Test 160 public void testEnableDisablePeer() throws Exception { 161 ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(); 162 admin.addReplicationPeer(ID_ONE, rpc1).join(); 163 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 164 assertEquals(1, peers.size()); 165 assertTrue(peers.get(0).isEnabled()); 166 167 admin.disableReplicationPeer(ID_ONE).join(); 168 peers = admin.listReplicationPeers().get(); 169 assertEquals(1, peers.size()); 170 assertFalse(peers.get(0).isEnabled()); 171 admin.removeReplicationPeer(ID_ONE).join(); 172 } 173 174 @Test 175 public void testAppendPeerTableCFs() throws Exception { 176 ReplicationPeerConfigBuilder rpcBuilder = 177 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 178 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 179 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 180 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 181 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 182 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5"); 183 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6"); 184 185 // Add a valid peer 186 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 187 rpcBuilder.setReplicateAllUserTables(false); 188 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 189 190 Map<TableName, List<String>> tableCFs = new HashMap<>(); 191 192 // append table t1 to replication 193 tableCFs.put(tableName1, null); 194 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 195 Map<TableName, List<String>> result = 196 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 197 assertEquals(1, result.size()); 198 assertEquals(true, result.containsKey(tableName1)); 199 assertNull(result.get(tableName1)); 200 201 // append table t2 to replication 202 tableCFs.clear(); 203 tableCFs.put(tableName2, null); 204 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 205 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 206 assertEquals(2, result.size()); 207 assertTrue("Should contain t1", result.containsKey(tableName1)); 208 assertTrue("Should contain t2", result.containsKey(tableName2)); 209 assertNull(result.get(tableName1)); 210 assertNull(result.get(tableName2)); 211 212 // append table column family: f1 of t3 to replication 213 tableCFs.clear(); 214 tableCFs.put(tableName3, new ArrayList<>()); 215 tableCFs.get(tableName3).add("f1"); 216 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 217 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 218 assertEquals(3, result.size()); 219 assertTrue("Should contain t1", result.containsKey(tableName1)); 220 assertTrue("Should contain t2", result.containsKey(tableName2)); 221 assertTrue("Should contain t3", result.containsKey(tableName3)); 222 assertNull(result.get(tableName1)); 223 assertNull(result.get(tableName2)); 224 assertEquals(1, result.get(tableName3).size()); 225 assertEquals("f1", result.get(tableName3).get(0)); 226 227 // append table column family: f1,f2 of t4 to replication 228 tableCFs.clear(); 229 tableCFs.put(tableName4, new ArrayList<>()); 230 tableCFs.get(tableName4).add("f1"); 231 tableCFs.get(tableName4).add("f2"); 232 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 233 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 234 assertEquals(4, result.size()); 235 assertTrue("Should contain t1", result.containsKey(tableName1)); 236 assertTrue("Should contain t2", result.containsKey(tableName2)); 237 assertTrue("Should contain t3", result.containsKey(tableName3)); 238 assertTrue("Should contain t4", result.containsKey(tableName4)); 239 assertNull(result.get(tableName1)); 240 assertNull(result.get(tableName2)); 241 assertEquals(1, result.get(tableName3).size()); 242 assertEquals("f1", result.get(tableName3).get(0)); 243 assertEquals(2, result.get(tableName4).size()); 244 assertEquals("f1", result.get(tableName4).get(0)); 245 assertEquals("f2", result.get(tableName4).get(1)); 246 247 // append "table5" => [], then append "table5" => ["f1"] 248 tableCFs.clear(); 249 tableCFs.put(tableName5, new ArrayList<>()); 250 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 251 tableCFs.clear(); 252 tableCFs.put(tableName5, new ArrayList<>()); 253 tableCFs.get(tableName5).add("f1"); 254 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 255 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 256 assertEquals(5, result.size()); 257 assertTrue("Should contain t5", result.containsKey(tableName5)); 258 // null means replication all cfs of tab5 259 assertNull(result.get(tableName5)); 260 261 // append "table6" => ["f1"], then append "table6" => [] 262 tableCFs.clear(); 263 tableCFs.put(tableName6, new ArrayList<>()); 264 tableCFs.get(tableName6).add("f1"); 265 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 266 tableCFs.clear(); 267 tableCFs.put(tableName6, new ArrayList<>()); 268 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 269 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 270 assertEquals(6, result.size()); 271 assertTrue("Should contain t6", result.containsKey(tableName6)); 272 // null means replication all cfs of tab6 273 assertNull(result.get(tableName6)); 274 275 admin.removeReplicationPeer(ID_ONE).join(); 276 } 277 278 @Test 279 public void testRemovePeerTableCFs() throws Exception { 280 ReplicationPeerConfigBuilder rpcBuilder = 281 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 282 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 283 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 284 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 285 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 286 // Add a valid peer 287 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 288 rpcBuilder.setReplicateAllUserTables(false); 289 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 290 291 Map<TableName, List<String>> tableCFs = new HashMap<>(); 292 try { 293 tableCFs.put(tableName3, null); 294 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 295 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); 296 } catch (CompletionException e) { 297 assertTrue(e.getCause() instanceof ReplicationException); 298 } 299 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 300 301 tableCFs.clear(); 302 tableCFs.put(tableName1, null); 303 tableCFs.put(tableName2, new ArrayList<>()); 304 tableCFs.get(tableName2).add("cf1"); 305 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 306 try { 307 tableCFs.clear(); 308 tableCFs.put(tableName3, null); 309 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 310 fail("Test case should fail as removing table-cfs from a peer whose" 311 + " table-cfs didn't contain t3"); 312 } catch (CompletionException e) { 313 assertTrue(e.getCause() instanceof ReplicationException); 314 } 315 Map<TableName, List<String>> result = 316 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 317 assertEquals(2, result.size()); 318 assertTrue("Should contain t1", result.containsKey(tableName1)); 319 assertTrue("Should contain t2", result.containsKey(tableName2)); 320 assertNull(result.get(tableName1)); 321 assertEquals(1, result.get(tableName2).size()); 322 assertEquals("cf1", result.get(tableName2).get(0)); 323 324 try { 325 tableCFs.clear(); 326 tableCFs.put(tableName1, new ArrayList<>()); 327 tableCFs.get(tableName1).add("cf1"); 328 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 329 fail("Test case should fail, because table t1 didn't specify cfs in peer config"); 330 } catch (CompletionException e) { 331 assertTrue(e.getCause() instanceof ReplicationException); 332 } 333 tableCFs.clear(); 334 tableCFs.put(tableName1, null); 335 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 336 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 337 assertEquals(1, result.size()); 338 assertEquals(1, result.get(tableName2).size()); 339 assertEquals("cf1", result.get(tableName2).get(0)); 340 341 try { 342 tableCFs.clear(); 343 tableCFs.put(tableName2, null); 344 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 345 fail("Test case should fail, because table t2 hase specified cfs in peer config"); 346 } catch (CompletionException e) { 347 assertTrue(e.getCause() instanceof ReplicationException); 348 } 349 tableCFs.clear(); 350 tableCFs.put(tableName2, new ArrayList<>()); 351 tableCFs.get(tableName2).add("cf1"); 352 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 353 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 354 355 tableCFs.clear(); 356 tableCFs.put(tableName4, new ArrayList<>()); 357 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 358 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 359 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 360 361 admin.removeReplicationPeer(ID_ONE); 362 } 363 364 @Test 365 public void testSetPeerNamespaces() throws Exception { 366 String ns1 = "ns1"; 367 String ns2 = "ns2"; 368 369 ReplicationPeerConfigBuilder rpcBuilder = 370 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 371 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 372 rpcBuilder.setReplicateAllUserTables(false); 373 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 374 375 // add ns1 and ns2 to peer config 376 Set<String> namespaces = new HashSet<>(); 377 namespaces.add(ns1); 378 namespaces.add(ns2); 379 rpcBuilder.setNamespaces(namespaces); 380 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 381 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 382 assertEquals(2, namespaces.size()); 383 assertTrue(namespaces.contains(ns1)); 384 assertTrue(namespaces.contains(ns2)); 385 386 // update peer config only contains ns1 387 namespaces = new HashSet<>(); 388 namespaces.add(ns1); 389 rpcBuilder.setNamespaces(namespaces); 390 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 391 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 392 assertEquals(1, namespaces.size()); 393 assertTrue(namespaces.contains(ns1)); 394 395 admin.removeReplicationPeer(ID_ONE).join(); 396 } 397 398 @Test 399 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 400 String ns1 = "ns1"; 401 String ns2 = "ns2"; 402 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1"); 403 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2"); 404 405 ReplicationPeerConfigBuilder rpcBuilder = 406 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 407 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 408 rpcBuilder.setReplicateAllUserTables(false); 409 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 410 411 Set<String> namespaces = new HashSet<String>(); 412 namespaces.add(ns1); 413 rpcBuilder.setNamespaces(namespaces); 414 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get(); 415 Map<TableName, List<String>> tableCfs = new HashMap<>(); 416 tableCfs.put(tableName1, new ArrayList<>()); 417 rpcBuilder.setTableCFsMap(tableCfs); 418 try { 419 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 420 fail( 421 "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); 422 } catch (CompletionException e) { 423 // OK 424 } 425 426 tableCfs.clear(); 427 tableCfs.put(tableName2, new ArrayList<>()); 428 rpcBuilder.setTableCFsMap(tableCfs); 429 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get(); 430 namespaces.clear(); 431 namespaces.add(ns2); 432 rpcBuilder.setNamespaces(namespaces); 433 try { 434 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 435 fail( 436 "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); 437 } catch (CompletionException e) { 438 // OK 439 } 440 441 admin.removeReplicationPeer(ID_ONE).join(); 442 } 443 444 @Test 445 public void testPeerBandwidth() throws Exception { 446 ReplicationPeerConfigBuilder rpcBuilder = 447 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE); 448 449 admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join(); 450 ; 451 assertEquals(0, admin.getReplicationPeerConfig(ID_ONE).get().getBandwidth()); 452 453 rpcBuilder.setBandwidth(2097152); 454 admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join(); 455 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); 456 457 admin.removeReplicationPeer(ID_ONE).join(); 458 } 459 460 @Test 461 public void testInvalidClusterKey() throws InterruptedException { 462 try { 463 admin.addReplicationPeer(ID_ONE, 464 ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); 465 fail(); 466 } catch (ExecutionException e) { 467 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 468 } 469 } 470 471 @Test 472 public void testClusterKeyWithTrailingSpace() throws Exception { 473 admin.addReplicationPeer(ID_ONE, 474 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get(); 475 String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey(); 476 assertEquals(KEY_ONE, clusterKey); 477 } 478 479 @Test 480 public void testInvalidReplicationEndpoint() throws InterruptedException { 481 try { 482 admin.addReplicationPeer(ID_ONE, 483 ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); 484 fail(); 485 } catch (ExecutionException e) { 486 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 487 assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); 488 } 489 } 490 491 @Test 492 public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { 493 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint 494 admin 495 .addReplicationPeer(ID_ONE, 496 ReplicationPeerConfig.newBuilder() 497 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build()) 498 .get(); 499 500 // but we still need to check cluster key if we specify the default ReplicationEndpoint 501 try { 502 admin 503 .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() 504 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) 505 .get(); 506 fail(); 507 } catch (ExecutionException e) { 508 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 509 } 510 } 511 512 /* 513 * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist. 514 */ 515 @Test 516 public void testReplicationPeerNotFoundException() throws InterruptedException { 517 String dummyPeer = "dummy_peer"; 518 try { 519 admin.removeReplicationPeer(dummyPeer).get(); 520 fail(); 521 } catch (ExecutionException e) { 522 assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class)); 523 } 524 } 525}