001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
021import static org.hamcrest.CoreMatchers.instanceOf;
022import static org.hamcrest.CoreMatchers.startsWith;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertFalse;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.assertNull;
027import static org.junit.Assert.assertThat;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.ArrayList;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.UUID;
039import java.util.concurrent.CompletionException;
040import java.util.concurrent.ExecutionException;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.ServerName;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
047import org.apache.hadoop.hbase.replication.ReplicationException;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
052import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
053import org.apache.hadoop.hbase.testclassification.ClientTests;
054import org.apache.hadoop.hbase.testclassification.LargeTests;
055import org.junit.After;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.runner.RunWith;
061import org.junit.runners.Parameterized;
062
063/**
064 * Class to test asynchronous replication admin operations.
065 */
066@RunWith(Parameterized.class)
067@Category({ LargeTests.class, ClientTests.class })
068public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
069
070  @ClassRule
071  public static final HBaseClassTestRule CLASS_RULE =
072    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
073
074  private final String ID_ONE = "1";
075  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
076  private final String ID_TWO = "2";
077  private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
078
079  @BeforeClass
080  public static void setUpBeforeClass() throws Exception {
081    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
082    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
083    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
084    TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
085    TEST_UTIL.startMiniCluster();
086    ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
087  }
088
089  @After
090  public void clearPeerAndQueues() throws IOException, ReplicationException {
091    try {
092      admin.removeReplicationPeer(ID_ONE).join();
093    } catch (Exception e) {
094    }
095    try {
096      admin.removeReplicationPeer(ID_TWO).join();
097    } catch (Exception e) {
098    }
099    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
100      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
101    for (ServerName serverName : queueStorage.getListOfReplicators()) {
102      for (String queue : queueStorage.getAllQueues(serverName)) {
103        queueStorage.removeQueue(serverName, queue);
104      }
105    }
106  }
107
108  @Test
109  public void testAddRemovePeer() throws Exception {
110    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
111    rpc1.setClusterKey(KEY_ONE);
112    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
113    rpc2.setClusterKey(KEY_TWO);
114    // Add a valid peer
115    admin.addReplicationPeer(ID_ONE, rpc1).join();
116    // try adding the same (fails)
117    try {
118      admin.addReplicationPeer(ID_ONE, rpc1).join();
119      fail("Test case should fail as adding a same peer.");
120    } catch (CompletionException e) {
121      // OK!
122    }
123    assertEquals(1, admin.listReplicationPeers().get().size());
124    // Try to remove an inexisting peer
125    try {
126      admin.removeReplicationPeer(ID_TWO).join();
127      fail("Test case should fail as removing a inexisting peer.");
128    } catch (CompletionException e) {
129      // OK!
130    }
131    assertEquals(1, admin.listReplicationPeers().get().size());
132    // Add a second since multi-slave is supported
133    admin.addReplicationPeer(ID_TWO, rpc2).join();
134    assertEquals(2, admin.listReplicationPeers().get().size());
135    // Remove the first peer we added
136    admin.removeReplicationPeer(ID_ONE).join();
137    assertEquals(1, admin.listReplicationPeers().get().size());
138    admin.removeReplicationPeer(ID_TWO).join();
139    assertEquals(0, admin.listReplicationPeers().get().size());
140  }
141
142  @Test
143  public void testPeerConfig() throws Exception {
144    ReplicationPeerConfig config = new ReplicationPeerConfig();
145    config.setClusterKey(KEY_ONE);
146    config.getConfiguration().put("key1", "value1");
147    config.getConfiguration().put("key2", "value2");
148    admin.addReplicationPeer(ID_ONE, config).join();
149
150    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
151    assertEquals(1, peers.size());
152    ReplicationPeerDescription peerOne = peers.get(0);
153    assertNotNull(peerOne);
154    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
155    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
156
157    admin.removeReplicationPeer(ID_ONE).join();
158  }
159
160  @Test
161  public void testEnableDisablePeer() throws Exception {
162    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
163    rpc1.setClusterKey(KEY_ONE);
164    admin.addReplicationPeer(ID_ONE, rpc1).join();
165    List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
166    assertEquals(1, peers.size());
167    assertTrue(peers.get(0).isEnabled());
168
169    admin.disableReplicationPeer(ID_ONE).join();
170    peers = admin.listReplicationPeers().get();
171    assertEquals(1, peers.size());
172    assertFalse(peers.get(0).isEnabled());
173    admin.removeReplicationPeer(ID_ONE).join();
174  }
175
176  @Test
177  public void testAppendPeerTableCFs() throws Exception {
178    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
179    rpc1.setClusterKey(KEY_ONE);
180    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
181    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
182    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
183    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
184    final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
185    final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
186
187    // Add a valid peer
188    admin.addReplicationPeer(ID_ONE, rpc1).join();
189    rpc1.setReplicateAllUserTables(false);
190    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
191
192    Map<TableName, List<String>> tableCFs = new HashMap<>();
193
194    // append table t1 to replication
195    tableCFs.put(tableName1, null);
196    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
197    Map<TableName, List<String>> result =
198      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
199    assertEquals(1, result.size());
200    assertEquals(true, result.containsKey(tableName1));
201    assertNull(result.get(tableName1));
202
203    // append table t2 to replication
204    tableCFs.clear();
205    tableCFs.put(tableName2, null);
206    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
207    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
208    assertEquals(2, result.size());
209    assertTrue("Should contain t1", result.containsKey(tableName1));
210    assertTrue("Should contain t2", result.containsKey(tableName2));
211    assertNull(result.get(tableName1));
212    assertNull(result.get(tableName2));
213
214    // append table column family: f1 of t3 to replication
215    tableCFs.clear();
216    tableCFs.put(tableName3, new ArrayList<>());
217    tableCFs.get(tableName3).add("f1");
218    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
219    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
220    assertEquals(3, result.size());
221    assertTrue("Should contain t1", result.containsKey(tableName1));
222    assertTrue("Should contain t2", result.containsKey(tableName2));
223    assertTrue("Should contain t3", result.containsKey(tableName3));
224    assertNull(result.get(tableName1));
225    assertNull(result.get(tableName2));
226    assertEquals(1, result.get(tableName3).size());
227    assertEquals("f1", result.get(tableName3).get(0));
228
229    // append table column family: f1,f2 of t4 to replication
230    tableCFs.clear();
231    tableCFs.put(tableName4, new ArrayList<>());
232    tableCFs.get(tableName4).add("f1");
233    tableCFs.get(tableName4).add("f2");
234    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
235    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
236    assertEquals(4, result.size());
237    assertTrue("Should contain t1", result.containsKey(tableName1));
238    assertTrue("Should contain t2", result.containsKey(tableName2));
239    assertTrue("Should contain t3", result.containsKey(tableName3));
240    assertTrue("Should contain t4", result.containsKey(tableName4));
241    assertNull(result.get(tableName1));
242    assertNull(result.get(tableName2));
243    assertEquals(1, result.get(tableName3).size());
244    assertEquals("f1", result.get(tableName3).get(0));
245    assertEquals(2, result.get(tableName4).size());
246    assertEquals("f1", result.get(tableName4).get(0));
247    assertEquals("f2", result.get(tableName4).get(1));
248
249    // append "table5" => [], then append "table5" => ["f1"]
250    tableCFs.clear();
251    tableCFs.put(tableName5, new ArrayList<>());
252    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
253    tableCFs.clear();
254    tableCFs.put(tableName5, new ArrayList<>());
255    tableCFs.get(tableName5).add("f1");
256    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
257    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
258    assertEquals(5, result.size());
259    assertTrue("Should contain t5", result.containsKey(tableName5));
260    // null means replication all cfs of tab5
261    assertNull(result.get(tableName5));
262
263    // append "table6" => ["f1"], then append "table6" => []
264    tableCFs.clear();
265    tableCFs.put(tableName6, new ArrayList<>());
266    tableCFs.get(tableName6).add("f1");
267    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
268    tableCFs.clear();
269    tableCFs.put(tableName6, new ArrayList<>());
270    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
271    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
272    assertEquals(6, result.size());
273    assertTrue("Should contain t6", result.containsKey(tableName6));
274    // null means replication all cfs of tab6
275    assertNull(result.get(tableName6));
276
277    admin.removeReplicationPeer(ID_ONE).join();
278  }
279
280  @Test
281  public void testRemovePeerTableCFs() throws Exception {
282    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
283    rpc1.setClusterKey(KEY_ONE);
284    final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
285    final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
286    final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
287    final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
288    // Add a valid peer
289    admin.addReplicationPeer(ID_ONE, rpc1).join();
290    rpc1.setReplicateAllUserTables(false);
291    admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
292
293    Map<TableName, List<String>> tableCFs = new HashMap<>();
294    try {
295      tableCFs.put(tableName3, null);
296      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
297      fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
298    } catch (CompletionException e) {
299      assertTrue(e.getCause() instanceof ReplicationException);
300    }
301    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
302
303    tableCFs.clear();
304    tableCFs.put(tableName1, null);
305    tableCFs.put(tableName2, new ArrayList<>());
306    tableCFs.get(tableName2).add("cf1");
307    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
308    try {
309      tableCFs.clear();
310      tableCFs.put(tableName3, null);
311      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
312      fail("Test case should fail as removing table-cfs from a peer whose" +
313        " table-cfs didn't contain t3");
314    } catch (CompletionException e) {
315      assertTrue(e.getCause() instanceof ReplicationException);
316    }
317    Map<TableName, List<String>> result =
318      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
319    assertEquals(2, result.size());
320    assertTrue("Should contain t1", result.containsKey(tableName1));
321    assertTrue("Should contain t2", result.containsKey(tableName2));
322    assertNull(result.get(tableName1));
323    assertEquals(1, result.get(tableName2).size());
324    assertEquals("cf1", result.get(tableName2).get(0));
325
326    try {
327      tableCFs.clear();
328      tableCFs.put(tableName1, new ArrayList<>());
329      tableCFs.get(tableName1).add("cf1");
330      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
331      fail("Test case should fail, because table t1 didn't specify cfs in peer config");
332    } catch (CompletionException e) {
333      assertTrue(e.getCause() instanceof ReplicationException);
334    }
335    tableCFs.clear();
336    tableCFs.put(tableName1, null);
337    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
338    result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
339    assertEquals(1, result.size());
340    assertEquals(1, result.get(tableName2).size());
341    assertEquals("cf1", result.get(tableName2).get(0));
342
343    try {
344      tableCFs.clear();
345      tableCFs.put(tableName2, null);
346      admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
347      fail("Test case should fail, because table t2 hase specified cfs in peer config");
348    } catch (CompletionException e) {
349      assertTrue(e.getCause() instanceof ReplicationException);
350    }
351    tableCFs.clear();
352    tableCFs.put(tableName2, new ArrayList<>());
353    tableCFs.get(tableName2).add("cf1");
354    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
355    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
356
357    tableCFs.clear();
358    tableCFs.put(tableName4, new ArrayList<>());
359    admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
360    admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
361    assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
362
363    admin.removeReplicationPeer(ID_ONE);
364  }
365
366  @Test
367  public void testSetPeerNamespaces() throws Exception {
368    String ns1 = "ns1";
369    String ns2 = "ns2";
370
371    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
372    rpc.setClusterKey(KEY_ONE);
373    admin.addReplicationPeer(ID_ONE, rpc).join();
374    rpc.setReplicateAllUserTables(false);
375    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
376
377    // add ns1 and ns2 to peer config
378    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
379    Set<String> namespaces = new HashSet<>();
380    namespaces.add(ns1);
381    namespaces.add(ns2);
382    rpc.setNamespaces(namespaces);
383    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
384    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
385    assertEquals(2, namespaces.size());
386    assertTrue(namespaces.contains(ns1));
387    assertTrue(namespaces.contains(ns2));
388
389    // update peer config only contains ns1
390    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
391    namespaces = new HashSet<>();
392    namespaces.add(ns1);
393    rpc.setNamespaces(namespaces);
394    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
395    namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
396    assertEquals(1, namespaces.size());
397    assertTrue(namespaces.contains(ns1));
398
399    admin.removeReplicationPeer(ID_ONE).join();
400  }
401
402  @Test
403  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
404    String ns1 = "ns1";
405    String ns2 = "ns2";
406    final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
407    final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
408
409    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
410    rpc.setClusterKey(KEY_ONE);
411    admin.addReplicationPeer(ID_ONE, rpc).join();
412    rpc.setReplicateAllUserTables(false);
413    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
414
415    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
416    Set<String> namespaces = new HashSet<String>();
417    namespaces.add(ns1);
418    rpc.setNamespaces(namespaces);
419    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
420    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
421    Map<TableName, List<String>> tableCfs = new HashMap<>();
422    tableCfs.put(tableName1, new ArrayList<>());
423    rpc.setTableCFsMap(tableCfs);
424    try {
425      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
426      fail(
427        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
428    } catch (CompletionException e) {
429      // OK
430    }
431
432    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
433    tableCfs.clear();
434    tableCfs.put(tableName2, new ArrayList<>());
435    rpc.setTableCFsMap(tableCfs);
436    admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
437    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
438    namespaces.clear();
439    namespaces.add(ns2);
440    rpc.setNamespaces(namespaces);
441    try {
442      admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
443      fail(
444        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
445    } catch (CompletionException e) {
446      // OK
447    }
448
449    admin.removeReplicationPeer(ID_ONE).join();
450  }
451
452  @Test
453  public void testPeerBandwidth() throws Exception {
454    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
455    rpc.setClusterKey(KEY_ONE);
456
457    admin.addReplicationPeer(ID_ONE, rpc).join();
458    rpc = admin.getReplicationPeerConfig(ID_ONE).get();
459    assertEquals(0, rpc.getBandwidth());
460
461    rpc.setBandwidth(2097152);
462    admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
463    assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
464
465    admin.removeReplicationPeer(ID_ONE).join();
466  }
467
468  @Test
469  public void testInvalidClusterKey() throws InterruptedException {
470    try {
471      admin.addReplicationPeer(ID_ONE,
472        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
473      fail();
474    } catch (ExecutionException e) {
475      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
476    }
477  }
478
479  @Test
480  public void testInvalidReplicationEndpoint() throws InterruptedException {
481    try {
482      admin.addReplicationPeer(ID_ONE,
483        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
484      fail();
485    } catch (ExecutionException e) {
486      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
487      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
488    }
489  }
490
491  public static final class DummyReplicationEndpoint extends BaseReplicationEndpoint {
492
493    @Override
494    public UUID getPeerUUID() {
495      return ctx.getClusterId();
496    }
497
498    @Override
499    public boolean replicate(ReplicateContext replicateContext) {
500      return true;
501    }
502
503    @Override
504    public void start() {
505      startAsync();
506    }
507
508    @Override
509    public void stop() {
510      stopAsync();
511    }
512
513    @Override
514    protected void doStart() {
515      notifyStarted();
516    }
517
518    @Override
519    protected void doStop() {
520      notifyStopped();
521    }
522  }
523
524  @Test
525  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
526    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
527    admin.addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder()
528      .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build()).get();
529
530    // but we still need to check cluster key if we specify the default ReplicationEndpoint
531    try {
532      admin
533        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
534          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
535        .get();
536      fail();
537    } catch (ExecutionException e) {
538      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
539    }
540  }
541}