001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.CoreMatchers.startsWith; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CompletionException; 039import java.util.concurrent.ExecutionException; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableName; 046import org.apache.hadoop.hbase.replication.ReplicationException; 047import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 051import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint; 052import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 053import org.apache.hadoop.hbase.testclassification.ClientTests; 054import org.apache.hadoop.hbase.testclassification.LargeTests; 055import org.junit.After; 056import org.junit.BeforeClass; 057import org.junit.ClassRule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.runner.RunWith; 061import org.junit.runners.Parameterized; 062 063/** 064 * Class to test asynchronous replication admin operations. 065 */ 066@RunWith(Parameterized.class) 067@Category({ LargeTests.class, ClientTests.class }) 068public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); 073 074 private final String ID_ONE = "1"; 075 private static String KEY_ONE; 076 private final String ID_TWO = "2"; 077 private static String KEY_TWO; 078 079 @BeforeClass 080 public static void setUpBeforeClass() throws Exception { 081 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 082 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 083 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 084 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 085 TEST_UTIL.startMiniCluster(); 086 KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; 087 KEY_TWO = TEST_UTIL.getClusterKey() + "-test2"; 088 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 089 } 090 091 @After 092 public void clearPeerAndQueues() throws IOException, ReplicationException { 093 try { 094 admin.removeReplicationPeer(ID_ONE).join(); 095 } catch (Exception e) { 096 } 097 try { 098 admin.removeReplicationPeer(ID_TWO).join(); 099 } catch (Exception e) { 100 } 101 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 102 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 103 for (ServerName serverName : queueStorage.getListOfReplicators()) { 104 for (String queue : queueStorage.getAllQueues(serverName)) { 105 queueStorage.removeQueue(serverName, queue); 106 } 107 } 108 } 109 110 @Test 111 public void testAddRemovePeer() throws Exception { 112 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 113 rpc1.setClusterKey(KEY_ONE); 114 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 115 rpc2.setClusterKey(KEY_TWO); 116 // Add a valid peer 117 admin.addReplicationPeer(ID_ONE, rpc1).join(); 118 // try adding the same (fails) 119 try { 120 admin.addReplicationPeer(ID_ONE, rpc1).join(); 121 fail("Test case should fail as adding a same peer."); 122 } catch (CompletionException e) { 123 // OK! 124 } 125 assertEquals(1, admin.listReplicationPeers().get().size()); 126 // Try to remove an inexisting peer 127 try { 128 admin.removeReplicationPeer(ID_TWO).join(); 129 fail("Test case should fail as removing a inexisting peer."); 130 } catch (CompletionException e) { 131 // OK! 132 } 133 assertEquals(1, admin.listReplicationPeers().get().size()); 134 // Add a second since multi-slave is supported 135 admin.addReplicationPeer(ID_TWO, rpc2).join(); 136 assertEquals(2, admin.listReplicationPeers().get().size()); 137 // Remove the first peer we added 138 admin.removeReplicationPeer(ID_ONE).join(); 139 assertEquals(1, admin.listReplicationPeers().get().size()); 140 admin.removeReplicationPeer(ID_TWO).join(); 141 assertEquals(0, admin.listReplicationPeers().get().size()); 142 } 143 144 @Test 145 public void testPeerConfig() throws Exception { 146 ReplicationPeerConfig config = new ReplicationPeerConfig(); 147 config.setClusterKey(KEY_ONE); 148 config.getConfiguration().put("key1", "value1"); 149 config.getConfiguration().put("key2", "value2"); 150 admin.addReplicationPeer(ID_ONE, config).join(); 151 152 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 153 assertEquals(1, peers.size()); 154 ReplicationPeerDescription peerOne = peers.get(0); 155 assertNotNull(peerOne); 156 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 157 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 158 159 admin.removeReplicationPeer(ID_ONE).join(); 160 } 161 162 @Test 163 public void testEnableDisablePeer() throws Exception { 164 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 165 rpc1.setClusterKey(KEY_ONE); 166 admin.addReplicationPeer(ID_ONE, rpc1).join(); 167 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 168 assertEquals(1, peers.size()); 169 assertTrue(peers.get(0).isEnabled()); 170 171 admin.disableReplicationPeer(ID_ONE).join(); 172 peers = admin.listReplicationPeers().get(); 173 assertEquals(1, peers.size()); 174 assertFalse(peers.get(0).isEnabled()); 175 admin.removeReplicationPeer(ID_ONE).join(); 176 } 177 178 @Test 179 public void testAppendPeerTableCFs() throws Exception { 180 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 181 rpc1.setClusterKey(KEY_ONE); 182 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 183 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 184 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 185 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 186 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5"); 187 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6"); 188 189 // Add a valid peer 190 admin.addReplicationPeer(ID_ONE, rpc1).join(); 191 rpc1.setReplicateAllUserTables(false); 192 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 193 194 Map<TableName, List<String>> tableCFs = new HashMap<>(); 195 196 // append table t1 to replication 197 tableCFs.put(tableName1, null); 198 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 199 Map<TableName, List<String>> result = 200 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 201 assertEquals(1, result.size()); 202 assertEquals(true, result.containsKey(tableName1)); 203 assertNull(result.get(tableName1)); 204 205 // append table t2 to replication 206 tableCFs.clear(); 207 tableCFs.put(tableName2, null); 208 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 209 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 210 assertEquals(2, result.size()); 211 assertTrue("Should contain t1", result.containsKey(tableName1)); 212 assertTrue("Should contain t2", result.containsKey(tableName2)); 213 assertNull(result.get(tableName1)); 214 assertNull(result.get(tableName2)); 215 216 // append table column family: f1 of t3 to replication 217 tableCFs.clear(); 218 tableCFs.put(tableName3, new ArrayList<>()); 219 tableCFs.get(tableName3).add("f1"); 220 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 221 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 222 assertEquals(3, result.size()); 223 assertTrue("Should contain t1", result.containsKey(tableName1)); 224 assertTrue("Should contain t2", result.containsKey(tableName2)); 225 assertTrue("Should contain t3", result.containsKey(tableName3)); 226 assertNull(result.get(tableName1)); 227 assertNull(result.get(tableName2)); 228 assertEquals(1, result.get(tableName3).size()); 229 assertEquals("f1", result.get(tableName3).get(0)); 230 231 // append table column family: f1,f2 of t4 to replication 232 tableCFs.clear(); 233 tableCFs.put(tableName4, new ArrayList<>()); 234 tableCFs.get(tableName4).add("f1"); 235 tableCFs.get(tableName4).add("f2"); 236 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 237 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 238 assertEquals(4, result.size()); 239 assertTrue("Should contain t1", result.containsKey(tableName1)); 240 assertTrue("Should contain t2", result.containsKey(tableName2)); 241 assertTrue("Should contain t3", result.containsKey(tableName3)); 242 assertTrue("Should contain t4", result.containsKey(tableName4)); 243 assertNull(result.get(tableName1)); 244 assertNull(result.get(tableName2)); 245 assertEquals(1, result.get(tableName3).size()); 246 assertEquals("f1", result.get(tableName3).get(0)); 247 assertEquals(2, result.get(tableName4).size()); 248 assertEquals("f1", result.get(tableName4).get(0)); 249 assertEquals("f2", result.get(tableName4).get(1)); 250 251 // append "table5" => [], then append "table5" => ["f1"] 252 tableCFs.clear(); 253 tableCFs.put(tableName5, new ArrayList<>()); 254 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 255 tableCFs.clear(); 256 tableCFs.put(tableName5, new ArrayList<>()); 257 tableCFs.get(tableName5).add("f1"); 258 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 259 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 260 assertEquals(5, result.size()); 261 assertTrue("Should contain t5", result.containsKey(tableName5)); 262 // null means replication all cfs of tab5 263 assertNull(result.get(tableName5)); 264 265 // append "table6" => ["f1"], then append "table6" => [] 266 tableCFs.clear(); 267 tableCFs.put(tableName6, new ArrayList<>()); 268 tableCFs.get(tableName6).add("f1"); 269 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 270 tableCFs.clear(); 271 tableCFs.put(tableName6, new ArrayList<>()); 272 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 273 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 274 assertEquals(6, result.size()); 275 assertTrue("Should contain t6", result.containsKey(tableName6)); 276 // null means replication all cfs of tab6 277 assertNull(result.get(tableName6)); 278 279 admin.removeReplicationPeer(ID_ONE).join(); 280 } 281 282 @Test 283 public void testRemovePeerTableCFs() throws Exception { 284 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 285 rpc1.setClusterKey(KEY_ONE); 286 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 287 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 288 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 289 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 290 // Add a valid peer 291 admin.addReplicationPeer(ID_ONE, rpc1).join(); 292 rpc1.setReplicateAllUserTables(false); 293 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 294 295 Map<TableName, List<String>> tableCFs = new HashMap<>(); 296 try { 297 tableCFs.put(tableName3, null); 298 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 299 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); 300 } catch (CompletionException e) { 301 assertTrue(e.getCause() instanceof ReplicationException); 302 } 303 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 304 305 tableCFs.clear(); 306 tableCFs.put(tableName1, null); 307 tableCFs.put(tableName2, new ArrayList<>()); 308 tableCFs.get(tableName2).add("cf1"); 309 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 310 try { 311 tableCFs.clear(); 312 tableCFs.put(tableName3, null); 313 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 314 fail("Test case should fail as removing table-cfs from a peer whose" 315 + " table-cfs didn't contain t3"); 316 } catch (CompletionException e) { 317 assertTrue(e.getCause() instanceof ReplicationException); 318 } 319 Map<TableName, List<String>> result = 320 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 321 assertEquals(2, result.size()); 322 assertTrue("Should contain t1", result.containsKey(tableName1)); 323 assertTrue("Should contain t2", result.containsKey(tableName2)); 324 assertNull(result.get(tableName1)); 325 assertEquals(1, result.get(tableName2).size()); 326 assertEquals("cf1", result.get(tableName2).get(0)); 327 328 try { 329 tableCFs.clear(); 330 tableCFs.put(tableName1, new ArrayList<>()); 331 tableCFs.get(tableName1).add("cf1"); 332 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 333 fail("Test case should fail, because table t1 didn't specify cfs in peer config"); 334 } catch (CompletionException e) { 335 assertTrue(e.getCause() instanceof ReplicationException); 336 } 337 tableCFs.clear(); 338 tableCFs.put(tableName1, null); 339 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 340 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 341 assertEquals(1, result.size()); 342 assertEquals(1, result.get(tableName2).size()); 343 assertEquals("cf1", result.get(tableName2).get(0)); 344 345 try { 346 tableCFs.clear(); 347 tableCFs.put(tableName2, null); 348 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 349 fail("Test case should fail, because table t2 hase specified cfs in peer config"); 350 } catch (CompletionException e) { 351 assertTrue(e.getCause() instanceof ReplicationException); 352 } 353 tableCFs.clear(); 354 tableCFs.put(tableName2, new ArrayList<>()); 355 tableCFs.get(tableName2).add("cf1"); 356 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 357 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 358 359 tableCFs.clear(); 360 tableCFs.put(tableName4, new ArrayList<>()); 361 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 362 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 363 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 364 365 admin.removeReplicationPeer(ID_ONE); 366 } 367 368 @Test 369 public void testSetPeerNamespaces() throws Exception { 370 String ns1 = "ns1"; 371 String ns2 = "ns2"; 372 373 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 374 rpc.setClusterKey(KEY_ONE); 375 admin.addReplicationPeer(ID_ONE, rpc).join(); 376 rpc.setReplicateAllUserTables(false); 377 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 378 379 // add ns1 and ns2 to peer config 380 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 381 Set<String> namespaces = new HashSet<>(); 382 namespaces.add(ns1); 383 namespaces.add(ns2); 384 rpc.setNamespaces(namespaces); 385 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 386 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 387 assertEquals(2, namespaces.size()); 388 assertTrue(namespaces.contains(ns1)); 389 assertTrue(namespaces.contains(ns2)); 390 391 // update peer config only contains ns1 392 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 393 namespaces = new HashSet<>(); 394 namespaces.add(ns1); 395 rpc.setNamespaces(namespaces); 396 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 397 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 398 assertEquals(1, namespaces.size()); 399 assertTrue(namespaces.contains(ns1)); 400 401 admin.removeReplicationPeer(ID_ONE).join(); 402 } 403 404 @Test 405 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 406 String ns1 = "ns1"; 407 String ns2 = "ns2"; 408 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1"); 409 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2"); 410 411 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 412 rpc.setClusterKey(KEY_ONE); 413 admin.addReplicationPeer(ID_ONE, rpc).join(); 414 rpc.setReplicateAllUserTables(false); 415 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 416 417 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 418 Set<String> namespaces = new HashSet<String>(); 419 namespaces.add(ns1); 420 rpc.setNamespaces(namespaces); 421 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 422 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 423 Map<TableName, List<String>> tableCfs = new HashMap<>(); 424 tableCfs.put(tableName1, new ArrayList<>()); 425 rpc.setTableCFsMap(tableCfs); 426 try { 427 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 428 fail( 429 "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); 430 } catch (CompletionException e) { 431 // OK 432 } 433 434 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 435 tableCfs.clear(); 436 tableCfs.put(tableName2, new ArrayList<>()); 437 rpc.setTableCFsMap(tableCfs); 438 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 439 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 440 namespaces.clear(); 441 namespaces.add(ns2); 442 rpc.setNamespaces(namespaces); 443 try { 444 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 445 fail( 446 "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); 447 } catch (CompletionException e) { 448 // OK 449 } 450 451 admin.removeReplicationPeer(ID_ONE).join(); 452 } 453 454 @Test 455 public void testPeerBandwidth() throws Exception { 456 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 457 rpc.setClusterKey(KEY_ONE); 458 459 admin.addReplicationPeer(ID_ONE, rpc).join(); 460 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 461 assertEquals(0, rpc.getBandwidth()); 462 463 rpc.setBandwidth(2097152); 464 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 465 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); 466 467 admin.removeReplicationPeer(ID_ONE).join(); 468 } 469 470 @Test 471 public void testInvalidClusterKey() throws InterruptedException { 472 try { 473 admin.addReplicationPeer(ID_ONE, 474 ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); 475 fail(); 476 } catch (ExecutionException e) { 477 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 478 } 479 } 480 481 @Test 482 public void testClusterKeyWithTrailingSpace() throws Exception { 483 admin.addReplicationPeer(ID_ONE, 484 ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get(); 485 String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey(); 486 assertEquals(KEY_ONE, clusterKey); 487 } 488 489 @Test 490 public void testInvalidReplicationEndpoint() throws InterruptedException { 491 try { 492 admin.addReplicationPeer(ID_ONE, 493 ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); 494 fail(); 495 } catch (ExecutionException e) { 496 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 497 assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); 498 } 499 } 500 501 @Test 502 public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { 503 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint 504 admin 505 .addReplicationPeer(ID_ONE, 506 ReplicationPeerConfig.newBuilder() 507 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build()) 508 .get(); 509 510 // but we still need to check cluster key if we specify the default ReplicationEndpoint 511 try { 512 admin 513 .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() 514 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) 515 .get(); 516 fail(); 517 } catch (ExecutionException e) { 518 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 519 } 520 } 521 522 /* 523 * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist. 524 */ 525 @Test 526 public void testReplicationPeerNotFoundException() throws InterruptedException { 527 String dummyPeer = "dummy_peer"; 528 try { 529 admin.removeReplicationPeer(dummyPeer).get(); 530 fail(); 531 } catch (ExecutionException e) { 532 assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class)); 533 } 534 } 535}