001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.CoreMatchers.startsWith;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CompletionException;
039import java.util.concurrent.ExecutionException;
040import org.apache.hadoop.hbase.DoNotRetryIOException;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.replication.ReplicationException;
047import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
051import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
052import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
053import org.apache.hadoop.hbase.testclassification.ClientTests;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.junit.After;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.runner.RunWith;
061import org.junit.runners.Parameterized;
062
063/**
064 * Class to test asynchronous replication admin operations.
065 */
066@RunWith(Parameterized.class)
067@Category({ LargeTests.class, ClientTests.class })
068public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
073
074  private final String ID_ONE = "1";
075  private static String KEY_ONE;
076  private final String ID_TWO = "2";
077  private static String KEY_TWO;
078
079  @BeforeClass
080  public static void setUpBeforeClass() throws Exception {
081    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
082    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
083    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
084    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
085    TEST_UTIL.startMiniCluster();
086    KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
087    KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
088    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
089  }
090
091  @After
092  public void clearPeerAndQueues() throws IOException, ReplicationException {
093    try {
094      admin.removeReplicationPeer(ID_ONE).join();
095    } catch (Exception e) {
096    }
097    try {
098      admin.removeReplicationPeer(ID_TWO).join();
099    } catch (Exception e) {
100    }
101    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
102      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
103    for (ServerName serverName : queueStorage.getListOfReplicators()) {
104      for (String queue : queueStorage.getAllQueues(serverName)) {
105        queueStorage.removeQueue(serverName, queue);
106      }
107    }
108  }
109
110  @Test
111  public void testAddRemovePeer() throws Exception {
112    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
113    rpc1.setClusterKey(KEY_ONE);
114    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
115    rpc2.setClusterKey(KEY_TWO);
116    // Add a valid peer
117    admin.addReplicationPeer(ID_ONE, rpc1).join();
118    // try adding the same (fails)
119    try {
120      admin.addReplicationPeer(ID_ONE, rpc1).join();
121      fail("Test case should fail as adding a same peer.");
122    } catch (CompletionException e) {
123      // OK!
124    }
125    assertEquals(1, admin.listReplicationPeers().get().size());
126    // Try to remove an inexisting peer
127    try {
128      admin.removeReplicationPeer(ID_TWO).join();
129      fail("Test case should fail as removing a inexisting peer.");
130    } catch (CompletionException e) {
131      // OK!
132    }
133    assertEquals(1, admin.listReplicationPeers().get().size());
134    // Add a second since multi-slave is supported
135    admin.addReplicationPeer(ID_TWO, rpc2).join();
136    assertEquals(2, admin.listReplicationPeers().get().size());
137    // Remove the first peer we added
138    admin.removeReplicationPeer(ID_ONE).join();
139    assertEquals(1, admin.listReplicationPeers().get().size());
140    admin.removeReplicationPeer(ID_TWO).join();
141    assertEquals(0, admin.listReplicationPeers().get().size());
142  }
143
144  @Test
145  public void testPeerConfig() throws Exception {
146    ReplicationPeerConfig config = new ReplicationPeerConfig();
147    config.setClusterKey(KEY_ONE);
148    config.getConfiguration().put("key1", "value1");
149    config.getConfiguration().put("key2", "value2");
150    admin.addReplicationPeer(ID_ONE, config).join();
151
152    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
153    assertEquals(1, peers.size());
154    ReplicationPeerDescription peerOne = peers.get(0);
155    assertNotNull(peerOne);
156    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
157    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
158
159    admin.removeReplicationPeer(ID_ONE).join();
160  }
161
162  @Test
163  public void testEnableDisablePeer() throws Exception {
164    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
165    rpc1.setClusterKey(KEY_ONE);
166    admin.addReplicationPeer(ID_ONE, rpc1).join();
167    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
168    assertEquals(1, peers.size());
169    assertTrue(peers.get(0).isEnabled());
170
171    admin.disableReplicationPeer(ID_ONE).join();
172    peers = admin.listReplicationPeers().get();
173    assertEquals(1, peers.size());
174    assertFalse(peers.get(0).isEnabled());
175    admin.removeReplicationPeer(ID_ONE).join();
176  }
177
178  @Test
179  public void testAppendPeerTableCFs() throws Exception {
180    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
181    rpc1.setClusterKey(KEY_ONE);
182    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
183    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
184    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
185    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
186    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
187    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
188
189    // Add a valid peer
190    admin.addReplicationPeer(ID_ONE, rpc1).join();
191    rpc1.setReplicateAllUserTables(false);
192    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
193
194    Map<TableName, List<String>> tableCFs = new HashMap<>();
195
196    // append table t1 to replication
197    tableCFs.put(tableName1, null);
198    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
199    Map<TableName, List<String>> result =
200      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
201    assertEquals(1, result.size());
202    assertEquals(true, result.containsKey(tableName1));
203    assertNull(result.get(tableName1));
204
205    // append table t2 to replication
206    tableCFs.clear();
207    tableCFs.put(tableName2, null);
208    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
209    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
210    assertEquals(2, result.size());
211    assertTrue("Should contain t1", result.containsKey(tableName1));
212    assertTrue("Should contain t2", result.containsKey(tableName2));
213    assertNull(result.get(tableName1));
214    assertNull(result.get(tableName2));
215
216    // append table column family: f1 of t3 to replication
217    tableCFs.clear();
218    tableCFs.put(tableName3, new ArrayList<>());
219    tableCFs.get(tableName3).add("f1");
220    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
221    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
222    assertEquals(3, result.size());
223    assertTrue("Should contain t1", result.containsKey(tableName1));
224    assertTrue("Should contain t2", result.containsKey(tableName2));
225    assertTrue("Should contain t3", result.containsKey(tableName3));
226    assertNull(result.get(tableName1));
227    assertNull(result.get(tableName2));
228    assertEquals(1, result.get(tableName3).size());
229    assertEquals("f1", result.get(tableName3).get(0));
230
231    // append table column family: f1,f2 of t4 to replication
232    tableCFs.clear();
233    tableCFs.put(tableName4, new ArrayList<>());
234    tableCFs.get(tableName4).add("f1");
235    tableCFs.get(tableName4).add("f2");
236    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
237    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
238    assertEquals(4, result.size());
239    assertTrue("Should contain t1", result.containsKey(tableName1));
240    assertTrue("Should contain t2", result.containsKey(tableName2));
241    assertTrue("Should contain t3", result.containsKey(tableName3));
242    assertTrue("Should contain t4", result.containsKey(tableName4));
243    assertNull(result.get(tableName1));
244    assertNull(result.get(tableName2));
245    assertEquals(1, result.get(tableName3).size());
246    assertEquals("f1", result.get(tableName3).get(0));
247    assertEquals(2, result.get(tableName4).size());
248    assertEquals("f1", result.get(tableName4).get(0));
249    assertEquals("f2", result.get(tableName4).get(1));
250
251    // append "table5" => [], then append "table5" => ["f1"]
252    tableCFs.clear();
253    tableCFs.put(tableName5, new ArrayList<>());
254    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
255    tableCFs.clear();
256    tableCFs.put(tableName5, new ArrayList<>());
257    tableCFs.get(tableName5).add("f1");
258    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
259    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
260    assertEquals(5, result.size());
261    assertTrue("Should contain t5", result.containsKey(tableName5));
262    // null means replication all cfs of tab5
263    assertNull(result.get(tableName5));
264
265    // append "table6" => ["f1"], then append "table6" => []
266    tableCFs.clear();
267    tableCFs.put(tableName6, new ArrayList<>());
268    tableCFs.get(tableName6).add("f1");
269    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
270    tableCFs.clear();
271    tableCFs.put(tableName6, new ArrayList<>());
272    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
273    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
274    assertEquals(6, result.size());
275    assertTrue("Should contain t6", result.containsKey(tableName6));
276    // null means replication all cfs of tab6
277    assertNull(result.get(tableName6));
278
279    admin.removeReplicationPeer(ID_ONE).join();
280  }
281
282  @Test
283  public void testRemovePeerTableCFs() throws Exception {
284    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
285    rpc1.setClusterKey(KEY_ONE);
286    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
287    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
288    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
289    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
290    // Add a valid peer
291    admin.addReplicationPeer(ID_ONE, rpc1).join();
292    rpc1.setReplicateAllUserTables(false);
293    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
294
295    Map<TableName, List<String>> tableCFs = new HashMap<>();
296    try {
297      tableCFs.put(tableName3, null);
298      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
299      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
300    } catch (CompletionException e) {
301      assertTrue(e.getCause() instanceof ReplicationException);
302    }
303    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
304
305    tableCFs.clear();
306    tableCFs.put(tableName1, null);
307    tableCFs.put(tableName2, new ArrayList<>());
308    tableCFs.get(tableName2).add("cf1");
309    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
310    try {
311      tableCFs.clear();
312      tableCFs.put(tableName3, null);
313      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
314      fail("Test case should fail as removing table-cfs from a peer whose"
315        + " table-cfs didn't contain t3");
316    } catch (CompletionException e) {
317      assertTrue(e.getCause() instanceof ReplicationException);
318    }
319    Map<TableName, List<String>> result =
320      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
321    assertEquals(2, result.size());
322    assertTrue("Should contain t1", result.containsKey(tableName1));
323    assertTrue("Should contain t2", result.containsKey(tableName2));
324    assertNull(result.get(tableName1));
325    assertEquals(1, result.get(tableName2).size());
326    assertEquals("cf1", result.get(tableName2).get(0));
327
328    try {
329      tableCFs.clear();
330      tableCFs.put(tableName1, new ArrayList<>());
331      tableCFs.get(tableName1).add("cf1");
332      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
333      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
334    } catch (CompletionException e) {
335      assertTrue(e.getCause() instanceof ReplicationException);
336    }
337    tableCFs.clear();
338    tableCFs.put(tableName1, null);
339    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
340    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
341    assertEquals(1, result.size());
342    assertEquals(1, result.get(tableName2).size());
343    assertEquals("cf1", result.get(tableName2).get(0));
344
345    try {
346      tableCFs.clear();
347      tableCFs.put(tableName2, null);
348      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
349      fail("Test case should fail, because table t2 hase specified cfs in peer config");
350    } catch (CompletionException e) {
351      assertTrue(e.getCause() instanceof ReplicationException);
352    }
353    tableCFs.clear();
354    tableCFs.put(tableName2, new ArrayList<>());
355    tableCFs.get(tableName2).add("cf1");
356    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
357    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
358
359    tableCFs.clear();
360    tableCFs.put(tableName4, new ArrayList<>());
361    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
362    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
363    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
364
365    admin.removeReplicationPeer(ID_ONE);
366  }
367
368  @Test
369  public void testSetPeerNamespaces() throws Exception {
370    String ns1 = "ns1";
371    String ns2 = "ns2";
372
373    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
374    rpc.setClusterKey(KEY_ONE);
375    admin.addReplicationPeer(ID_ONE, rpc).join();
376    rpc.setReplicateAllUserTables(false);
377    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
378
379    // add ns1 and ns2 to peer config
380    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
381    Set<String> namespaces = new HashSet<>();
382    namespaces.add(ns1);
383    namespaces.add(ns2);
384    rpc.setNamespaces(namespaces);
385    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
386    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
387    assertEquals(2, namespaces.size());
388    assertTrue(namespaces.contains(ns1));
389    assertTrue(namespaces.contains(ns2));
390
391    // update peer config only contains ns1
392    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
393    namespaces = new HashSet<>();
394    namespaces.add(ns1);
395    rpc.setNamespaces(namespaces);
396    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
397    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
398    assertEquals(1, namespaces.size());
399    assertTrue(namespaces.contains(ns1));
400
401    admin.removeReplicationPeer(ID_ONE).join();
402  }
403
404  @Test
405  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
406    String ns1 = "ns1";
407    String ns2 = "ns2";
408    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
409    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
410
411    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
412    rpc.setClusterKey(KEY_ONE);
413    admin.addReplicationPeer(ID_ONE, rpc).join();
414    rpc.setReplicateAllUserTables(false);
415    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
416
417    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
418    Set<String> namespaces = new HashSet<String>();
419    namespaces.add(ns1);
420    rpc.setNamespaces(namespaces);
421    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
422    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
423    Map<TableName, List<String>> tableCfs = new HashMap<>();
424    tableCfs.put(tableName1, new ArrayList<>());
425    rpc.setTableCFsMap(tableCfs);
426    try {
427      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
428      fail(
429        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
430    } catch (CompletionException e) {
431      // OK
432    }
433
434    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
435    tableCfs.clear();
436    tableCfs.put(tableName2, new ArrayList<>());
437    rpc.setTableCFsMap(tableCfs);
438    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
439    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
440    namespaces.clear();
441    namespaces.add(ns2);
442    rpc.setNamespaces(namespaces);
443    try {
444      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
445      fail(
446        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
447    } catch (CompletionException e) {
448      // OK
449    }
450
451    admin.removeReplicationPeer(ID_ONE).join();
452  }
453
454  @Test
455  public void testPeerBandwidth() throws Exception {
456    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
457    rpc.setClusterKey(KEY_ONE);
458
459    admin.addReplicationPeer(ID_ONE, rpc).join();
460    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
461    assertEquals(0, rpc.getBandwidth());
462
463    rpc.setBandwidth(2097152);
464    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
465    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
466
467    admin.removeReplicationPeer(ID_ONE).join();
468  }
469
470  @Test
471  public void testInvalidClusterKey() throws InterruptedException {
472    try {
473      admin.addReplicationPeer(ID_ONE,
474        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
475      fail();
476    } catch (ExecutionException e) {
477      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
478    }
479  }
480
481  @Test
482  public void testClusterKeyWithTrailingSpace() throws Exception {
483    admin.addReplicationPeer(ID_ONE,
484      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get();
485    String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey();
486    assertEquals(KEY_ONE, clusterKey);
487  }
488
489  @Test
490  public void testInvalidReplicationEndpoint() throws InterruptedException {
491    try {
492      admin.addReplicationPeer(ID_ONE,
493        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
494      fail();
495    } catch (ExecutionException e) {
496      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
497      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
498    }
499  }
500
501  @Test
502  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
503    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
504    admin
505      .addReplicationPeer(ID_ONE,
506        ReplicationPeerConfig.newBuilder()
507          .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
508      .get();
509
510    // but we still need to check cluster key if we specify the default ReplicationEndpoint
511    try {
512      admin
513        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
514          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
515        .get();
516      fail();
517    } catch (ExecutionException e) {
518      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
519    }
520  }
521
522  /*
523   * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist.
524   */
525  @Test
526  public void testReplicationPeerNotFoundException() throws InterruptedException {
527    String dummyPeer = "dummy_peer";
528    try {
529      admin.removeReplicationPeer(dummyPeer).get();
530      fail();
531    } catch (ExecutionException e) {
532      assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class));
533    }
534  }
535}