001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.hamcrest.Matchers.containsString;
023import static org.hamcrest.Matchers.instanceOf;
024import static org.hamcrest.Matchers.startsWith;
025import static org.junit.jupiter.api.Assertions.assertEquals;
026import static org.junit.jupiter.api.Assertions.assertFalse;
027import static org.junit.jupiter.api.Assertions.assertNotNull;
028import static org.junit.jupiter.api.Assertions.assertNull;
029import static org.junit.jupiter.api.Assertions.assertThrows;
030import static org.junit.jupiter.api.Assertions.assertTrue;
031import static org.junit.jupiter.api.Assertions.fail;
032
033import java.io.IOException;
034import java.util.ArrayList;
035import java.util.HashMap;
036import java.util.HashSet;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.concurrent.CompletionException;
041import java.util.concurrent.ExecutionException;
042import java.util.function.Supplier;
043import org.apache.hadoop.hbase.DoNotRetryIOException;
044import org.apache.hadoop.hbase.HBaseParameterizedTestTemplate;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.replication.ReplicationException;
049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
050import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
051import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
052import org.apache.hadoop.hbase.replication.ReplicationQueueData;
053import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
054import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
055import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
056import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
057import org.apache.hadoop.hbase.testclassification.ClientTests;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.junit.jupiter.api.AfterAll;
060import org.junit.jupiter.api.AfterEach;
061import org.junit.jupiter.api.BeforeAll;
062import org.junit.jupiter.api.Tag;
063import org.junit.jupiter.api.TestTemplate;
064
065/**
066 * Class to test asynchronous replication admin operations.
067 */
068@Tag(LargeTests.TAG)
069@Tag(ClientTests.TAG)
070@HBaseParameterizedTestTemplate(name = "{index}: policy = {0}")
071public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
072
073  private final String ID_ONE = "1";
074  private static String KEY_ONE;
075  private final String ID_TWO = "2";
076  private static String KEY_TWO;
077
078  public TestAsyncReplicationAdminApi(Supplier<AsyncAdmin> admin) {
079    super(admin);
080  }
081
082  @BeforeAll
083  public static void setUpBeforeClass() throws Exception {
084    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
085    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
086    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
087    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
088    TEST_UTIL.startMiniCluster();
089    KEY_ONE = TEST_UTIL.getZkConnectionURI() + "-test1";
090    KEY_TWO = TEST_UTIL.getZkConnectionURI() + "-test2";
091    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
092  }
093
094  @AfterAll
095  public static void tearDownAfterClass() throws Exception {
096    TestAsyncAdminBase.tearDownAfterClass();
097  }
098
099  @AfterEach
100  public void clearPeerAndQueues() throws IOException, ReplicationException {
101    try {
102      admin.removeReplicationPeer(ID_ONE).join();
103    } catch (Exception e) {
104    }
105    try {
106      admin.removeReplicationPeer(ID_TWO).join();
107    } catch (Exception e) {
108    }
109    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
110      .getReplicationQueueStorage(TEST_UTIL.getConnection(), TEST_UTIL.getConfiguration());
111    for (ReplicationQueueData queueData : queueStorage.listAllQueues()) {
112      queueStorage.removeQueue(queueData.getId());
113    }
114    admin.replicationPeerModificationSwitch(true).join();
115  }
116
117  @TestTemplate
118  public void testAddRemovePeer() throws Exception {
119    ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build();
120    ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build();
121    // Add a valid peer
122    admin.addReplicationPeer(ID_ONE, rpc1).join();
123    // try adding the same (fails)
124    try {
125      admin.addReplicationPeer(ID_ONE, rpc1).join();
126      fail("Test case should fail as adding a same peer.");
127    } catch (CompletionException e) {
128      // OK!
129    }
130    assertEquals(1, admin.listReplicationPeers().get().size());
131    // Try to remove an inexisting peer
132    try {
133      admin.removeReplicationPeer(ID_TWO).join();
134      fail("Test case should fail as removing a inexisting peer.");
135    } catch (CompletionException e) {
136      // OK!
137    }
138    assertEquals(1, admin.listReplicationPeers().get().size());
139    // Add a second since multi-slave is supported
140    admin.addReplicationPeer(ID_TWO, rpc2).join();
141    assertEquals(2, admin.listReplicationPeers().get().size());
142    // Remove the first peer we added
143    admin.removeReplicationPeer(ID_ONE).join();
144    assertEquals(1, admin.listReplicationPeers().get().size());
145    admin.removeReplicationPeer(ID_TWO).join();
146    assertEquals(0, admin.listReplicationPeers().get().size());
147  }
148
149  @TestTemplate
150  public void testPeerConfig() throws Exception {
151    ReplicationPeerConfig config = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE)
152      .putConfiguration("key1", "value1").putConfiguration("key2", "value2").build();
153    admin.addReplicationPeer(ID_ONE, config).join();
154
155    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
156    assertEquals(1, peers.size());
157    ReplicationPeerDescription peerOne = peers.get(0);
158    assertNotNull(peerOne);
159    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
160    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
161
162    admin.removeReplicationPeer(ID_ONE).join();
163  }
164
165  @TestTemplate
166  public void testEnableDisablePeer() throws Exception {
167    ReplicationPeerConfig rpc1 = ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build();
168    admin.addReplicationPeer(ID_ONE, rpc1).join();
169    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
170    assertEquals(1, peers.size());
171    assertTrue(peers.get(0).isEnabled());
172
173    admin.disableReplicationPeer(ID_ONE).join();
174    peers = admin.listReplicationPeers().get();
175    assertEquals(1, peers.size());
176    assertFalse(peers.get(0).isEnabled());
177    admin.removeReplicationPeer(ID_ONE).join();
178  }
179
180  @TestTemplate
181  public void testAppendPeerTableCFs() throws Exception {
182    ReplicationPeerConfigBuilder rpcBuilder =
183      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
184    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
185    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
186    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
187    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
188    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
189    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
190
191    // Add a valid peer
192    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
193    rpcBuilder.setReplicateAllUserTables(false);
194    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
195
196    Map<TableName, List<String>> tableCFs = new HashMap<>();
197
198    // append table t1 to replication
199    tableCFs.put(tableName1, null);
200    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
201    Map<TableName, List<String>> result =
202      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
203    assertEquals(1, result.size());
204    assertEquals(true, result.containsKey(tableName1));
205    assertNull(result.get(tableName1));
206
207    // append table t2 to replication
208    tableCFs.clear();
209    tableCFs.put(tableName2, null);
210    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
211    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
212    assertEquals(2, result.size());
213    assertTrue(result.containsKey(tableName1), "Should contain t1");
214    assertTrue(result.containsKey(tableName2), "Should contain t2");
215    assertNull(result.get(tableName1));
216    assertNull(result.get(tableName2));
217
218    // append table column family: f1 of t3 to replication
219    tableCFs.clear();
220    tableCFs.put(tableName3, new ArrayList<>());
221    tableCFs.get(tableName3).add("f1");
222    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
223    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
224    assertEquals(3, result.size());
225    assertTrue(result.containsKey(tableName1), "Should contain t1");
226    assertTrue(result.containsKey(tableName2), "Should contain t2");
227    assertTrue(result.containsKey(tableName3), "Should contain t3");
228    assertNull(result.get(tableName1));
229    assertNull(result.get(tableName2));
230    assertEquals(1, result.get(tableName3).size());
231    assertEquals("f1", result.get(tableName3).get(0));
232
233    // append table column family: f1,f2 of t4 to replication
234    tableCFs.clear();
235    tableCFs.put(tableName4, new ArrayList<>());
236    tableCFs.get(tableName4).add("f1");
237    tableCFs.get(tableName4).add("f2");
238    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
239    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
240    assertEquals(4, result.size());
241    assertTrue(result.containsKey(tableName1), "Should contain t1");
242    assertTrue(result.containsKey(tableName2), "Should contain t2");
243    assertTrue(result.containsKey(tableName3), "Should contain t3");
244    assertTrue(result.containsKey(tableName4), "Should contain t4");
245    assertNull(result.get(tableName1));
246    assertNull(result.get(tableName2));
247    assertEquals(1, result.get(tableName3).size());
248    assertEquals("f1", result.get(tableName3).get(0));
249    assertEquals(2, result.get(tableName4).size());
250    assertEquals("f1", result.get(tableName4).get(0));
251    assertEquals("f2", result.get(tableName4).get(1));
252
253    // append "table5" => [], then append "table5" => ["f1"]
254    tableCFs.clear();
255    tableCFs.put(tableName5, new ArrayList<>());
256    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
257    tableCFs.clear();
258    tableCFs.put(tableName5, new ArrayList<>());
259    tableCFs.get(tableName5).add("f1");
260    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
261    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
262    assertEquals(5, result.size());
263    assertTrue(result.containsKey(tableName5), "Should contain t5");
264    // null means replication all cfs of tab5
265    assertNull(result.get(tableName5));
266
267    // append "table6" => ["f1"], then append "table6" => []
268    tableCFs.clear();
269    tableCFs.put(tableName6, new ArrayList<>());
270    tableCFs.get(tableName6).add("f1");
271    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
272    tableCFs.clear();
273    tableCFs.put(tableName6, new ArrayList<>());
274    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
275    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
276    assertEquals(6, result.size());
277    assertTrue(result.containsKey(tableName6), "Should contain t6");
278    // null means replication all cfs of tab6
279    assertNull(result.get(tableName6));
280
281    admin.removeReplicationPeer(ID_ONE).join();
282  }
283
284  @TestTemplate
285  public void testRemovePeerTableCFs() throws Exception {
286    ReplicationPeerConfigBuilder rpcBuilder =
287      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
288    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
289    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
290    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
291    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
292    // Add a valid peer
293    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
294    rpcBuilder.setReplicateAllUserTables(false);
295    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
296
297    Map<TableName, List<String>> tableCFs = new HashMap<>();
298    try {
299      tableCFs.put(tableName3, null);
300      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
301      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
302    } catch (CompletionException e) {
303      assertTrue(e.getCause() instanceof ReplicationException);
304    }
305    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
306
307    tableCFs.clear();
308    tableCFs.put(tableName1, null);
309    tableCFs.put(tableName2, new ArrayList<>());
310    tableCFs.get(tableName2).add("cf1");
311    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
312    try {
313      tableCFs.clear();
314      tableCFs.put(tableName3, null);
315      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
316      fail("Test case should fail as removing table-cfs from a peer whose"
317        + " table-cfs didn't contain t3");
318    } catch (CompletionException e) {
319      assertTrue(e.getCause() instanceof ReplicationException);
320    }
321    Map<TableName, List<String>> result =
322      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
323    assertEquals(2, result.size());
324    assertTrue(result.containsKey(tableName1), "Should contain t1");
325    assertTrue(result.containsKey(tableName2), "Should contain t2");
326    assertNull(result.get(tableName1));
327    assertEquals(1, result.get(tableName2).size());
328    assertEquals("cf1", result.get(tableName2).get(0));
329
330    try {
331      tableCFs.clear();
332      tableCFs.put(tableName1, new ArrayList<>());
333      tableCFs.get(tableName1).add("cf1");
334      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
335      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
336    } catch (CompletionException e) {
337      assertTrue(e.getCause() instanceof ReplicationException);
338    }
339    tableCFs.clear();
340    tableCFs.put(tableName1, null);
341    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
342    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
343    assertEquals(1, result.size());
344    assertEquals(1, result.get(tableName2).size());
345    assertEquals("cf1", result.get(tableName2).get(0));
346
347    try {
348      tableCFs.clear();
349      tableCFs.put(tableName2, null);
350      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
351      fail("Test case should fail, because table t2 hase specified cfs in peer config");
352    } catch (CompletionException e) {
353      assertTrue(e.getCause() instanceof ReplicationException);
354    }
355    tableCFs.clear();
356    tableCFs.put(tableName2, new ArrayList<>());
357    tableCFs.get(tableName2).add("cf1");
358    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
359    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
360
361    tableCFs.clear();
362    tableCFs.put(tableName4, new ArrayList<>());
363    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
364    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
365    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
366
367    admin.removeReplicationPeer(ID_ONE);
368  }
369
370  @TestTemplate
371  public void testSetPeerNamespaces() throws Exception {
372    String ns1 = "ns1";
373    String ns2 = "ns2";
374
375    ReplicationPeerConfigBuilder rpcBuilder =
376      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
377    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
378    rpcBuilder.setReplicateAllUserTables(false);
379    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
380
381    // add ns1 and ns2 to peer config
382    Set<String> namespaces = new HashSet<>();
383    namespaces.add(ns1);
384    namespaces.add(ns2);
385    rpcBuilder.setNamespaces(namespaces);
386    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
387    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
388    assertEquals(2, namespaces.size());
389    assertTrue(namespaces.contains(ns1));
390    assertTrue(namespaces.contains(ns2));
391
392    // update peer config only contains ns1
393    namespaces = new HashSet<>();
394    namespaces.add(ns1);
395    rpcBuilder.setNamespaces(namespaces);
396    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
397    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
398    assertEquals(1, namespaces.size());
399    assertTrue(namespaces.contains(ns1));
400
401    admin.removeReplicationPeer(ID_ONE).join();
402  }
403
404  @TestTemplate
405  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
406    String ns1 = "ns1";
407    String ns2 = "ns2";
408    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
409    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
410
411    ReplicationPeerConfigBuilder rpcBuilder =
412      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
413    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
414    rpcBuilder.setReplicateAllUserTables(false);
415    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
416
417    Set<String> namespaces = new HashSet<String>();
418    namespaces.add(ns1);
419    rpcBuilder.setNamespaces(namespaces);
420    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get();
421    Map<TableName, List<String>> tableCfs = new HashMap<>();
422    tableCfs.put(tableName1, new ArrayList<>());
423    rpcBuilder.setTableCFsMap(tableCfs);
424    try {
425      admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
426      fail(
427        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
428    } catch (CompletionException e) {
429      // OK
430    }
431
432    tableCfs.clear();
433    tableCfs.put(tableName2, new ArrayList<>());
434    rpcBuilder.setTableCFsMap(tableCfs);
435    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).get();
436    namespaces.clear();
437    namespaces.add(ns2);
438    rpcBuilder.setNamespaces(namespaces);
439    try {
440      admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
441      fail(
442        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
443    } catch (CompletionException e) {
444      // OK
445    }
446
447    admin.removeReplicationPeer(ID_ONE).join();
448  }
449
450  @TestTemplate
451  public void testPeerBandwidth() throws Exception {
452    ReplicationPeerConfigBuilder rpcBuilder =
453      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE);
454
455    admin.addReplicationPeer(ID_ONE, rpcBuilder.build()).join();
456    ;
457    assertEquals(0, admin.getReplicationPeerConfig(ID_ONE).get().getBandwidth());
458
459    rpcBuilder.setBandwidth(2097152);
460    admin.updateReplicationPeerConfig(ID_ONE, rpcBuilder.build()).join();
461    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
462
463    admin.removeReplicationPeer(ID_ONE).join();
464  }
465
466  @TestTemplate
467  public void testInvalidClusterKey() throws InterruptedException {
468    try {
469      admin.addReplicationPeer(ID_ONE,
470        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
471      fail();
472    } catch (ExecutionException e) {
473      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
474    }
475  }
476
477  @TestTemplate
478  public void testClusterKeyWithTrailingSpace() throws Exception {
479    admin.addReplicationPeer(ID_ONE,
480      ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE + " ").build()).get();
481    String clusterKey = admin.getReplicationPeerConfig(ID_ONE).get().getClusterKey();
482    assertEquals(KEY_ONE, clusterKey);
483  }
484
485  @TestTemplate
486  public void testInvalidReplicationEndpoint() throws InterruptedException {
487    try {
488      admin.addReplicationPeer(ID_ONE,
489        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
490      fail();
491    } catch (ExecutionException e) {
492      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
493      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
494    }
495  }
496
497  @TestTemplate
498  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
499    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
500    admin
501      .addReplicationPeer(ID_ONE,
502        ReplicationPeerConfig.newBuilder()
503          .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
504      .get();
505
506    // but we still need to check cluster key if we specify the default ReplicationEndpoint
507    try {
508      admin
509        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
510          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
511        .get();
512      fail();
513    } catch (ExecutionException e) {
514      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
515    }
516  }
517
518  /**
519   * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist.
520   */
521  @TestTemplate
522  public void testReplicationPeerNotFoundException() throws InterruptedException {
523    String dummyPeer = "dummy_peer";
524    try {
525      admin.removeReplicationPeer(dummyPeer).get();
526      fail();
527    } catch (ExecutionException e) {
528      assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class));
529    }
530  }
531
532  @TestTemplate
533  public void testReplicationPeerModificationSwitch() throws Exception {
534    assertTrue(admin.isReplicationPeerModificationEnabled().get());
535    // disable modification, should returns true as it is enabled by default and the above
536    // assertion has confirmed it
537    assertTrue(admin.replicationPeerModificationSwitch(false).get());
538    ExecutionException error = assertThrows(ExecutionException.class, () -> admin
539      .addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build())
540      .get());
541    assertThat(error.getCause().getMessage(),
542      containsString("Replication peer modification disabled"));
543    // enable again, and the previous value should be false
544    assertFalse(admin.replicationPeerModificationSwitch(true).get());
545  }
546}