001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.CoreMatchers.startsWith;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CompletionException;
039import java.util.concurrent.ExecutionException;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.replication.ReplicationException;
047import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
052import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
053import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
054import org.apache.hadoop.hbase.testclassification.ClientTests;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.junit.After;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.runner.RunWith;
062import org.junit.runners.Parameterized;
063
064/**
065 * Class to test asynchronous replication admin operations.
066 */
067@RunWith(Parameterized.class)
068@Category({ LargeTests.class, ClientTests.class })
069public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
074
075  private final String ID_ONE = "1";
076  private static String KEY_ONE;
077  private final String ID_TWO = "2";
078  private static String KEY_TWO;
079
080  @BeforeClass
081  public static void setUpBeforeClass() throws Exception {
082    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
083    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
084    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
085    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
086    TEST_UTIL.startMiniCluster();
087    KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
088    KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
089    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
090  }
091
092  @After
093  public void clearPeerAndQueues() throws IOException, ReplicationException {
094    try {
095      admin.removeReplicationPeer(ID_ONE).join();
096    } catch (Exception e) {
097    }
098    try {
099      admin.removeReplicationPeer(ID_TWO).join();
100    } catch (Exception e) {
101    }
102    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
103      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
104    for (ServerName serverName : queueStorage.getListOfReplicators()) {
105      for (String queue : queueStorage.getAllQueues(serverName)) {
106        queueStorage.removeQueue(serverName, queue);
107      }
108    }
109  }
110
111  @Test
112  public void testAddRemovePeer() throws Exception {
113    ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build();
114    ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build();
115    // Add a valid peer
116    admin.addReplicationPeer(ID_ONE, rpc1).join();
117    // try adding the same (fails)
118    try {
119      admin.addReplicationPeer(ID_ONE, rpc1).join();
120      fail("Test case should fail as adding a same peer.");
121    } catch (CompletionException e) {
122      // OK!
123    }
124    assertEquals(1, admin.listReplicationPeers().get().size());
125    // Try to remove an inexisting peer
126    try {
127      admin.removeReplicationPeer(ID_TWO).join();
128      fail("Test case should fail as removing a inexisting peer.");
129    } catch (CompletionException e) {
130      // OK!
131    }
132    assertEquals(1, admin.listReplicationPeers().get().size());
133    // Add a second since multi-slave is supported
134    admin.addReplicationPeer(ID_TWO, rpc2).join();
135    assertEquals(2, admin.listReplicationPeers().get().size());
136    // Remove the first peer we added
137    admin.removeReplicationPeer(ID_ONE).join();
138    assertEquals(1, admin.listReplicationPeers().get().size());
139    admin.removeReplicationPeer(ID_TWO).join();
140    assertEquals(0, admin.listReplicationPeers().get().size());
141  }
142
143  @Test
144  public void testPeerConfig() throws Exception {
145    ReplicationPeerConfig config = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE)
146      .putConfiguration("key1", "value1").putConfiguration("key2", "value2").build();
147    admin.addReplicationPeer(ID_ONE, config).join();
148
149    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
150    assertEquals(1, peers.size());
151    ReplicationPeerDescription peerOne = peers.get(0);
152    assertNotNull(peerOne);
153    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
154    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
155
156    admin.removeReplicationPeer(ID_ONE).join();
157  }
158
159  @Test
160  public void testEnableDisablePeer() throws Exception {
161    ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build();
162    admin.addReplicationPeer(ID_ONE, rpc1).join();
163    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
164    assertEquals(1, peers.size());
165    assertTrue(peers.get(0).isEnabled());
166
167    admin.disableReplicationPeer(ID_ONE).join();
168    peers = admin.listReplicationPeers().get();
169    assertEquals(1, peers.size());
170    assertFalse(peers.get(0).isEnabled());
171    admin.removeReplicationPeer(ID_ONE).join();
172  }
173
174  @Test
175  public void testAppendPeerTableCFs() throws Exception {
176    ReplicationPeerConfigBuilder rpcBuilder =
177      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
178    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
179    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
180    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
181    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
182    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
183    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
184
185    // Add a valid peer
186    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
187    rpcBuilder.setReplicateAllUserTables(false);
188    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
189
190    Map<TableName, List<String>> tableCFs = new HashMap<>();
191
192    // append table t1 to replication
193    tableCFs.put(tableName1, null);
194    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
195    Map<TableName, List<String>> result =
196      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
197    assertEquals(1, result.size());
198    assertEquals(true, result.containsKey(tableName1));
199    assertNull(result.get(tableName1));
200
201    // append table t2 to replication
202    tableCFs.clear();
203    tableCFs.put(tableName2, null);
204    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
205    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
206    assertEquals(2, result.size());
207    assertTrue("Should contain t1", result.containsKey(tableName1));
208    assertTrue("Should contain t2", result.containsKey(tableName2));
209    assertNull(result.get(tableName1));
210    assertNull(result.get(tableName2));
211
212    // append table column family: f1 of t3 to replication
213    tableCFs.clear();
214    tableCFs.put(tableName3, new ArrayList<>());
215    tableCFs.get(tableName3).add("f1");
216    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
217    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
218    assertEquals(3, result.size());
219    assertTrue("Should contain t1", result.containsKey(tableName1));
220    assertTrue("Should contain t2", result.containsKey(tableName2));
221    assertTrue("Should contain t3", result.containsKey(tableName3));
222    assertNull(result.get(tableName1));
223    assertNull(result.get(tableName2));
224    assertEquals(1, result.get(tableName3).size());
225    assertEquals("f1", result.get(tableName3).get(0));
226
227    // append table column family: f1,f2 of t4 to replication
228    tableCFs.clear();
229    tableCFs.put(tableName4, new ArrayList<>());
230    tableCFs.get(tableName4).add("f1");
231    tableCFs.get(tableName4).add("f2");
232    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
233    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
234    assertEquals(4, result.size());
235    assertTrue("Should contain t1", result.containsKey(tableName1));
236    assertTrue("Should contain t2", result.containsKey(tableName2));
237    assertTrue("Should contain t3", result.containsKey(tableName3));
238    assertTrue("Should contain t4", result.containsKey(tableName4));
239    assertNull(result.get(tableName1));
240    assertNull(result.get(tableName2));
241    assertEquals(1, result.get(tableName3).size());
242    assertEquals("f1", result.get(tableName3).get(0));
243    assertEquals(2, result.get(tableName4).size());
244    assertEquals("f1", result.get(tableName4).get(0));
245    assertEquals("f2", result.get(tableName4).get(1));
246
247    // append "table5" => [], then append "table5" => ["f1"]
248    tableCFs.clear();
249    tableCFs.put(tableName5, new ArrayList<>());
250    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
251    tableCFs.clear();
252    tableCFs.put(tableName5, new ArrayList<>());
253    tableCFs.get(tableName5).add("f1");
254    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
255    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
256    assertEquals(5, result.size());
257    assertTrue("Should contain t5", result.containsKey(tableName5));
258    // null means replication all cfs of tab5
259    assertNull(result.get(tableName5));
260
261    // append "table6" => ["f1"], then append "table6" => []
262    tableCFs.clear();
263    tableCFs.put(tableName6, new ArrayList<>());
264    tableCFs.get(tableName6).add("f1");
265    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
266    tableCFs.clear();
267    tableCFs.put(tableName6, new ArrayList<>());
268    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
269    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
270    assertEquals(6, result.size());
271    assertTrue("Should contain t6", result.containsKey(tableName6));
272    // null means replication all cfs of tab6
273    assertNull(result.get(tableName6));
274
275    admin.removeReplicationPeer(ID_ONE).join();
276  }
277
278  @Test
279  public void testRemovePeerTableCFs() throws Exception {
280    ReplicationPeerConfigBuilder rpcBuilder =
281      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
282    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
283    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
284    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
285    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
286    // Add a valid peer
287    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
288    rpcBuilder.setReplicateAllUserTables(false);
289    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
290
291    Map<TableName, List<String>> tableCFs = new HashMap<>();
292    try {
293      tableCFs.put(tableName3, null);
294      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
295      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
296    } catch (CompletionException e) {
297      assertTrue(e.getCause() instanceof ReplicationException);
298    }
299    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
300
301    tableCFs.clear();
302    tableCFs.put(tableName1, null);
303    tableCFs.put(tableName2, new ArrayList<>());
304    tableCFs.get(tableName2).add("cf1");
305    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
306    try {
307      tableCFs.clear();
308      tableCFs.put(tableName3, null);
309      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
310      fail("Test case should fail as removing table-cfs from a peer whose"
311        + " table-cfs didn't contain t3");
312    } catch (CompletionException e) {
313      assertTrue(e.getCause() instanceof ReplicationException);
314    }
315    Map<TableName, List<String>> result =
316      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
317    assertEquals(2, result.size());
318    assertTrue("Should contain t1", result.containsKey(tableName1));
319    assertTrue("Should contain t2", result.containsKey(tableName2));
320    assertNull(result.get(tableName1));
321    assertEquals(1, result.get(tableName2).size());
322    assertEquals("cf1", result.get(tableName2).get(0));
323
324    try {
325      tableCFs.clear();
326      tableCFs.put(tableName1, new ArrayList<>());
327      tableCFs.get(tableName1).add("cf1");
328      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
329      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
330    } catch (CompletionException e) {
331      assertTrue(e.getCause() instanceof ReplicationException);
332    }
333    tableCFs.clear();
334    tableCFs.put(tableName1, null);
335    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
336    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
337    assertEquals(1, result.size());
338    assertEquals(1, result.get(tableName2).size());
339    assertEquals("cf1", result.get(tableName2).get(0));
340
341    try {
342      tableCFs.clear();
343      tableCFs.put(tableName2, null);
344      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
345      fail("Test case should fail, because table t2 hase specified cfs in peer config");
346    } catch (CompletionException e) {
347      assertTrue(e.getCause() instanceof ReplicationException);
348    }
349    tableCFs.clear();
350    tableCFs.put(tableName2, new ArrayList<>());
351    tableCFs.get(tableName2).add("cf1");
352    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
353    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
354
355    tableCFs.clear();
356    tableCFs.put(tableName4, new ArrayList<>());
357    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
358    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
359    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
360
361    admin.removeReplicationPeer(ID_ONE);
362  }
363
364  @Test
365  public void testSetPeerNamespaces() throws Exception {
366    String ns1 = "ns1";
367    String ns2 = "ns2";
368
369    ReplicationPeerConfigBuilder rpcBuilder =
370      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
371    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
372    rpcBuilder.setReplicateAllUserTables(false);
373    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
374
375    // add ns1 and ns2 to peer config
376    Set<String> namespaces = new HashSet<>();
377    namespaces.add(ns1);
378    namespaces.add(ns2);
379    rpcBuilder.setNamespaces(namespaces);
380    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
381    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
382    assertEquals(2, namespaces.size());
383    assertTrue(namespaces.contains(ns1));
384    assertTrue(namespaces.contains(ns2));
385
386    // update peer config only contains ns1
387    namespaces = new HashSet<>();
388    namespaces.add(ns1);
389    rpcBuilder.setNamespaces(namespaces);
390    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
391    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
392    assertEquals(1, namespaces.size());
393    assertTrue(namespaces.contains(ns1));
394
395    admin.removeReplicationPeer(ID_ONE).join();
396  }
397
398  @Test
399  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
400    String ns1 = "ns1";
401    String ns2 = "ns2";
402    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
403    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
404
405    ReplicationPeerConfigBuilder rpcBuilder =
406      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
407    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
408    rpcBuilder.setReplicateAllUserTables(false);
409    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
410
411    Set<String> namespaces = new HashSet<String>();
412    namespaces.add(ns1);
413    rpcBuilder.setNamespaces(namespaces);
414    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get();
415    Map<TableName, List<String>> tableCfs = new HashMap<>();
416    tableCfs.put(tableName1, new ArrayList<>());
417    rpcBuilder.setTableCFsMap(tableCfs);
418    try {
419      admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
420      fail(
421        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
422    } catch (CompletionException e) {
423      // OK
424    }
425
426    tableCfs.clear();
427    tableCfs.put(tableName2, new ArrayList<>());
428    rpcBuilder.setTableCFsMap(tableCfs);
429    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get();
430    namespaces.clear();
431    namespaces.add(ns2);
432    rpcBuilder.setNamespaces(namespaces);
433    try {
434      admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
435      fail(
436        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
437    } catch (CompletionException e) {
438      // OK
439    }
440
441    admin.removeReplicationPeer(ID_ONE).join();
442  }
443
444  @Test
445  public void testPeerBandwidth() throws Exception {
446    ReplicationPeerConfigBuilder rpcBuilder =
447      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
448
449    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
450    ;
451    assertEquals(0, admin.getReplicationPeerConfig(ID_ONE).get().getBandwidth());
452
453    rpcBuilder.setBandwidth(2097152);
454    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
455    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
456
457    admin.removeReplicationPeer(ID_ONE).join();
458  }
459
460  @Test
461  public void testInvalidClusterKey() throws InterruptedException {
462    try {
463      admin.addReplicationPeer(ID_ONE,
464        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
465      fail();
466    } catch (ExecutionException e) {
467      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
468    }
469  }
470
471  @Test
472  public void testClusterKeyWithTrailingSpace() throws Exception {
473    admin.addReplicationPeer(ID_ONE,
474      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get();
475    String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey();
476    assertEquals(KEY_ONE, clusterKey);
477  }
478
479  @Test
480  public void testInvalidReplicationEndpoint() throws InterruptedException {
481    try {
482      admin.addReplicationPeer(ID_ONE,
483        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
484      fail();
485    } catch (ExecutionException e) {
486      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
487      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
488    }
489  }
490
491  @Test
492  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
493    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
494    admin
495      .addReplicationPeer(ID_ONE,
496        ReplicationPeerConfig.newBuilder()
497          .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
498      .get();
499
500    // but we still need to check cluster key if we specify the default ReplicationEndpoint
501    try {
502      admin
503        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
504          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
505        .get();
506      fail();
507    } catch (ExecutionException e) {
508      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
509    }
510  }
511
512  /*
513   * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist.
514   */
515  @Test
516  public void testReplicationPeerNotFoundException() throws InterruptedException {
517    String dummyPeer = "dummy_peer";
518    try {
519      admin.removeReplicationPeer(dummyPeer).get();
520      fail();
521    } catch (ExecutionException e) {
522      assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class));
523    }
524  }
525}