001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; 021import static org.hamcrest.CoreMatchers.instanceOf; 022import static org.hamcrest.CoreMatchers.startsWith; 023import static org.junit.Assert.assertEquals; 024import static org.junit.Assert.assertFalse; 025import static org.junit.Assert.assertNotNull; 026import static org.junit.Assert.assertNull; 027import static org.junit.Assert.assertThat; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.ArrayList; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.CompletionException; 039import java.util.concurrent.ExecutionException; 040import org.apache.hadoop.hbase.DoNotRetryIOException; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.replication.ReplicationException; 046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 047import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 050import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint; 051import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; 052import org.apache.hadoop.hbase.testclassification.ClientTests; 053import org.apache.hadoop.hbase.testclassification.LargeTests; 054import org.junit.After; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.runner.RunWith; 060import org.junit.runners.Parameterized; 061 062/** 063 * Class to test asynchronous replication admin operations. 064 */ 065@RunWith(Parameterized.class) 066@Category({ LargeTests.class, ClientTests.class }) 067public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); 072 073 private final String ID_ONE = "1"; 074 private static String KEY_ONE; 075 private final String ID_TWO = "2"; 076 private static String KEY_TWO; 077 078 @BeforeClass 079 public static void setUpBeforeClass() throws Exception { 080 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); 081 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); 082 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 083 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); 084 TEST_UTIL.startMiniCluster(); 085 KEY_ONE = TEST_UTIL.getClusterKey() + "-test1"; 086 KEY_TWO = TEST_UTIL.getClusterKey() + "-test2"; 087 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); 088 } 089 090 @After 091 public void clearPeerAndQueues() throws IOException, ReplicationException { 092 try { 093 admin.removeReplicationPeer(ID_ONE).join(); 094 } catch (Exception e) { 095 } 096 try { 097 admin.removeReplicationPeer(ID_TWO).join(); 098 } catch (Exception e) { 099 } 100 ReplicationQueueStorage queueStorage = ReplicationStorageFactory 101 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); 102 for (ServerName serverName : queueStorage.getListOfReplicators()) { 103 for (String queue : queueStorage.getAllQueues(serverName)) { 104 queueStorage.removeQueue(serverName, queue); 105 } 106 } 107 } 108 109 @Test 110 public void testAddRemovePeer() throws Exception { 111 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 112 rpc1.setClusterKey(KEY_ONE); 113 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); 114 rpc2.setClusterKey(KEY_TWO); 115 // Add a valid peer 116 admin.addReplicationPeer(ID_ONE, rpc1).join(); 117 // try adding the same (fails) 118 try { 119 admin.addReplicationPeer(ID_ONE, rpc1).join(); 120 fail("Test case should fail as adding a same peer."); 121 } catch (CompletionException e) { 122 // OK! 123 } 124 assertEquals(1, admin.listReplicationPeers().get().size()); 125 // Try to remove an inexisting peer 126 try { 127 admin.removeReplicationPeer(ID_TWO).join(); 128 fail("Test case should fail as removing a inexisting peer."); 129 } catch (CompletionException e) { 130 // OK! 131 } 132 assertEquals(1, admin.listReplicationPeers().get().size()); 133 // Add a second since multi-slave is supported 134 admin.addReplicationPeer(ID_TWO, rpc2).join(); 135 assertEquals(2, admin.listReplicationPeers().get().size()); 136 // Remove the first peer we added 137 admin.removeReplicationPeer(ID_ONE).join(); 138 assertEquals(1, admin.listReplicationPeers().get().size()); 139 admin.removeReplicationPeer(ID_TWO).join(); 140 assertEquals(0, admin.listReplicationPeers().get().size()); 141 } 142 143 @Test 144 public void testPeerConfig() throws Exception { 145 ReplicationPeerConfig config = new ReplicationPeerConfig(); 146 config.setClusterKey(KEY_ONE); 147 config.getConfiguration().put("key1", "value1"); 148 config.getConfiguration().put("key2", "value2"); 149 admin.addReplicationPeer(ID_ONE, config).join(); 150 151 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 152 assertEquals(1, peers.size()); 153 ReplicationPeerDescription peerOne = peers.get(0); 154 assertNotNull(peerOne); 155 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1")); 156 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2")); 157 158 admin.removeReplicationPeer(ID_ONE).join(); 159 } 160 161 @Test 162 public void testEnableDisablePeer() throws Exception { 163 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 164 rpc1.setClusterKey(KEY_ONE); 165 admin.addReplicationPeer(ID_ONE, rpc1).join(); 166 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get(); 167 assertEquals(1, peers.size()); 168 assertTrue(peers.get(0).isEnabled()); 169 170 admin.disableReplicationPeer(ID_ONE).join(); 171 peers = admin.listReplicationPeers().get(); 172 assertEquals(1, peers.size()); 173 assertFalse(peers.get(0).isEnabled()); 174 admin.removeReplicationPeer(ID_ONE).join(); 175 } 176 177 @Test 178 public void testAppendPeerTableCFs() throws Exception { 179 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 180 rpc1.setClusterKey(KEY_ONE); 181 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 182 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 183 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 184 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 185 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5"); 186 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6"); 187 188 // Add a valid peer 189 admin.addReplicationPeer(ID_ONE, rpc1).join(); 190 rpc1.setReplicateAllUserTables(false); 191 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 192 193 Map<TableName, List<String>> tableCFs = new HashMap<>(); 194 195 // append table t1 to replication 196 tableCFs.put(tableName1, null); 197 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 198 Map<TableName, List<String>> result = 199 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 200 assertEquals(1, result.size()); 201 assertEquals(true, result.containsKey(tableName1)); 202 assertNull(result.get(tableName1)); 203 204 // append table t2 to replication 205 tableCFs.clear(); 206 tableCFs.put(tableName2, null); 207 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 208 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 209 assertEquals(2, result.size()); 210 assertTrue("Should contain t1", result.containsKey(tableName1)); 211 assertTrue("Should contain t2", result.containsKey(tableName2)); 212 assertNull(result.get(tableName1)); 213 assertNull(result.get(tableName2)); 214 215 // append table column family: f1 of t3 to replication 216 tableCFs.clear(); 217 tableCFs.put(tableName3, new ArrayList<>()); 218 tableCFs.get(tableName3).add("f1"); 219 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 220 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 221 assertEquals(3, result.size()); 222 assertTrue("Should contain t1", result.containsKey(tableName1)); 223 assertTrue("Should contain t2", result.containsKey(tableName2)); 224 assertTrue("Should contain t3", result.containsKey(tableName3)); 225 assertNull(result.get(tableName1)); 226 assertNull(result.get(tableName2)); 227 assertEquals(1, result.get(tableName3).size()); 228 assertEquals("f1", result.get(tableName3).get(0)); 229 230 // append table column family: f1,f2 of t4 to replication 231 tableCFs.clear(); 232 tableCFs.put(tableName4, new ArrayList<>()); 233 tableCFs.get(tableName4).add("f1"); 234 tableCFs.get(tableName4).add("f2"); 235 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 236 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 237 assertEquals(4, result.size()); 238 assertTrue("Should contain t1", result.containsKey(tableName1)); 239 assertTrue("Should contain t2", result.containsKey(tableName2)); 240 assertTrue("Should contain t3", result.containsKey(tableName3)); 241 assertTrue("Should contain t4", result.containsKey(tableName4)); 242 assertNull(result.get(tableName1)); 243 assertNull(result.get(tableName2)); 244 assertEquals(1, result.get(tableName3).size()); 245 assertEquals("f1", result.get(tableName3).get(0)); 246 assertEquals(2, result.get(tableName4).size()); 247 assertEquals("f1", result.get(tableName4).get(0)); 248 assertEquals("f2", result.get(tableName4).get(1)); 249 250 // append "table5" => [], then append "table5" => ["f1"] 251 tableCFs.clear(); 252 tableCFs.put(tableName5, new ArrayList<>()); 253 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 254 tableCFs.clear(); 255 tableCFs.put(tableName5, new ArrayList<>()); 256 tableCFs.get(tableName5).add("f1"); 257 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 258 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 259 assertEquals(5, result.size()); 260 assertTrue("Should contain t5", result.containsKey(tableName5)); 261 // null means replication all cfs of tab5 262 assertNull(result.get(tableName5)); 263 264 // append "table6" => ["f1"], then append "table6" => [] 265 tableCFs.clear(); 266 tableCFs.put(tableName6, new ArrayList<>()); 267 tableCFs.get(tableName6).add("f1"); 268 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 269 tableCFs.clear(); 270 tableCFs.put(tableName6, new ArrayList<>()); 271 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 272 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 273 assertEquals(6, result.size()); 274 assertTrue("Should contain t6", result.containsKey(tableName6)); 275 // null means replication all cfs of tab6 276 assertNull(result.get(tableName6)); 277 278 admin.removeReplicationPeer(ID_ONE).join(); 279 } 280 281 @Test 282 public void testRemovePeerTableCFs() throws Exception { 283 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); 284 rpc1.setClusterKey(KEY_ONE); 285 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1"); 286 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2"); 287 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3"); 288 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4"); 289 // Add a valid peer 290 admin.addReplicationPeer(ID_ONE, rpc1).join(); 291 rpc1.setReplicateAllUserTables(false); 292 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join(); 293 294 Map<TableName, List<String>> tableCFs = new HashMap<>(); 295 try { 296 tableCFs.put(tableName3, null); 297 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 298 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null"); 299 } catch (CompletionException e) { 300 assertTrue(e.getCause() instanceof ReplicationException); 301 } 302 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 303 304 tableCFs.clear(); 305 tableCFs.put(tableName1, null); 306 tableCFs.put(tableName2, new ArrayList<>()); 307 tableCFs.get(tableName2).add("cf1"); 308 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 309 try { 310 tableCFs.clear(); 311 tableCFs.put(tableName3, null); 312 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 313 fail("Test case should fail as removing table-cfs from a peer whose" + 314 " table-cfs didn't contain t3"); 315 } catch (CompletionException e) { 316 assertTrue(e.getCause() instanceof ReplicationException); 317 } 318 Map<TableName, List<String>> result = 319 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 320 assertEquals(2, result.size()); 321 assertTrue("Should contain t1", result.containsKey(tableName1)); 322 assertTrue("Should contain t2", result.containsKey(tableName2)); 323 assertNull(result.get(tableName1)); 324 assertEquals(1, result.get(tableName2).size()); 325 assertEquals("cf1", result.get(tableName2).get(0)); 326 327 try { 328 tableCFs.clear(); 329 tableCFs.put(tableName1, new ArrayList<>()); 330 tableCFs.get(tableName1).add("cf1"); 331 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 332 fail("Test case should fail, because table t1 didn't specify cfs in peer config"); 333 } catch (CompletionException e) { 334 assertTrue(e.getCause() instanceof ReplicationException); 335 } 336 tableCFs.clear(); 337 tableCFs.put(tableName1, null); 338 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 339 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap(); 340 assertEquals(1, result.size()); 341 assertEquals(1, result.get(tableName2).size()); 342 assertEquals("cf1", result.get(tableName2).get(0)); 343 344 try { 345 tableCFs.clear(); 346 tableCFs.put(tableName2, null); 347 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 348 fail("Test case should fail, because table t2 hase specified cfs in peer config"); 349 } catch (CompletionException e) { 350 assertTrue(e.getCause() instanceof ReplicationException); 351 } 352 tableCFs.clear(); 353 tableCFs.put(tableName2, new ArrayList<>()); 354 tableCFs.get(tableName2).add("cf1"); 355 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 356 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 357 358 tableCFs.clear(); 359 tableCFs.put(tableName4, new ArrayList<>()); 360 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 361 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join(); 362 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap()); 363 364 admin.removeReplicationPeer(ID_ONE); 365 } 366 367 @Test 368 public void testSetPeerNamespaces() throws Exception { 369 String ns1 = "ns1"; 370 String ns2 = "ns2"; 371 372 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 373 rpc.setClusterKey(KEY_ONE); 374 admin.addReplicationPeer(ID_ONE, rpc).join(); 375 rpc.setReplicateAllUserTables(false); 376 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 377 378 // add ns1 and ns2 to peer config 379 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 380 Set<String> namespaces = new HashSet<>(); 381 namespaces.add(ns1); 382 namespaces.add(ns2); 383 rpc.setNamespaces(namespaces); 384 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 385 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 386 assertEquals(2, namespaces.size()); 387 assertTrue(namespaces.contains(ns1)); 388 assertTrue(namespaces.contains(ns2)); 389 390 // update peer config only contains ns1 391 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 392 namespaces = new HashSet<>(); 393 namespaces.add(ns1); 394 rpc.setNamespaces(namespaces); 395 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 396 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces(); 397 assertEquals(1, namespaces.size()); 398 assertTrue(namespaces.contains(ns1)); 399 400 admin.removeReplicationPeer(ID_ONE).join(); 401 } 402 403 @Test 404 public void testNamespacesAndTableCfsConfigConflict() throws Exception { 405 String ns1 = "ns1"; 406 String ns2 = "ns2"; 407 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1"); 408 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2"); 409 410 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 411 rpc.setClusterKey(KEY_ONE); 412 admin.addReplicationPeer(ID_ONE, rpc).join(); 413 rpc.setReplicateAllUserTables(false); 414 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 415 416 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 417 Set<String> namespaces = new HashSet<String>(); 418 namespaces.add(ns1); 419 rpc.setNamespaces(namespaces); 420 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 421 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 422 Map<TableName, List<String>> tableCfs = new HashMap<>(); 423 tableCfs.put(tableName1, new ArrayList<>()); 424 rpc.setTableCFsMap(tableCfs); 425 try { 426 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 427 fail( 428 "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1); 429 } catch (CompletionException e) { 430 // OK 431 } 432 433 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 434 tableCfs.clear(); 435 tableCfs.put(tableName2, new ArrayList<>()); 436 rpc.setTableCFsMap(tableCfs); 437 admin.updateReplicationPeerConfig(ID_ONE, rpc).get(); 438 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 439 namespaces.clear(); 440 namespaces.add(ns2); 441 rpc.setNamespaces(namespaces); 442 try { 443 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 444 fail( 445 "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2); 446 } catch (CompletionException e) { 447 // OK 448 } 449 450 admin.removeReplicationPeer(ID_ONE).join(); 451 } 452 453 @Test 454 public void testPeerBandwidth() throws Exception { 455 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 456 rpc.setClusterKey(KEY_ONE); 457 458 admin.addReplicationPeer(ID_ONE, rpc).join(); 459 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); 460 assertEquals(0, rpc.getBandwidth()); 461 462 rpc.setBandwidth(2097152); 463 admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); 464 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth()); 465 466 admin.removeReplicationPeer(ID_ONE).join(); 467 } 468 469 @Test 470 public void testInvalidClusterKey() throws InterruptedException { 471 try { 472 admin.addReplicationPeer(ID_ONE, 473 ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get(); 474 fail(); 475 } catch (ExecutionException e) { 476 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 477 } 478 } 479 480 @Test 481 public void testInvalidReplicationEndpoint() throws InterruptedException { 482 try { 483 admin.addReplicationPeer(ID_ONE, 484 ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get(); 485 fail(); 486 } catch (ExecutionException e) { 487 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 488 assertThat(e.getCause().getMessage(), startsWith("Can not instantiate")); 489 } 490 } 491 492 @Test 493 public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException { 494 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint 495 admin 496 .addReplicationPeer(ID_ONE, 497 ReplicationPeerConfig.newBuilder() 498 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build()) 499 .get(); 500 501 // but we still need to check cluster key if we specify the default ReplicationEndpoint 502 try { 503 admin 504 .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder() 505 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build()) 506 .get(); 507 fail(); 508 } catch (ExecutionException e) { 509 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class)); 510 } 511 } 512}