001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.regex.Pattern;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
045import org.apache.hadoop.hbase.replication.ReplicationException;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
047import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
051import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
052import org.apache.hadoop.hbase.testclassification.ClientTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.junit.After;
055import org.junit.AfterClass;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Unit testing of ReplicationAdmin
067 */
068@Category({ MediumTests.class, ClientTests.class })
069public class TestReplicationAdmin {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073    HBaseClassTestRule.forClass(TestReplicationAdmin.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class);
076
077  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
078
079  private final String ID_ONE = "1";
080  private static String KEY_ONE;
081  private final String ID_SECOND = "2";
082  private static String KEY_SECOND;
083
084  private static ReplicationAdmin admin;
085  private static Admin hbaseAdmin;
086
087  @Rule
088  public TestName name = new TestName();
089
090  /**
091   * @throws java.lang.Exception
092   */
093  @BeforeClass
094  public static void setUpBeforeClass() throws Exception {
095    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
096    TEST_UTIL.startMiniCluster();
097    admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
098    hbaseAdmin = TEST_UTIL.getAdmin();
099    KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
100    KEY_SECOND = TEST_UTIL.getClusterKey() + "-test2";
101  }
102
103  @AfterClass
104  public static void tearDownAfterClass() throws Exception {
105    if (admin != null) {
106      admin.close();
107    }
108    TEST_UTIL.shutdownMiniCluster();
109  }
110
111  @After
112  public void tearDown() throws Exception {
113    for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) {
114      hbaseAdmin.removeReplicationPeer(desc.getPeerId());
115    }
116    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
117      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
118    for (ServerName serverName : queueStorage.getListOfReplicators()) {
119      for (String queue : queueStorage.getAllQueues(serverName)) {
120        queueStorage.removeQueue(serverName, queue);
121      }
122      queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
123    }
124  }
125
126  @Test
127  public void testConcurrentPeerOperations() throws Exception {
128    int threadNum = 5;
129    AtomicLong successCount = new AtomicLong(0);
130
131    // Test concurrent add peer operation
132    Thread[] addPeers = new Thread[threadNum];
133    for (int i = 0; i < threadNum; i++) {
134      addPeers[i] = new Thread(() -> {
135        try {
136          hbaseAdmin.addReplicationPeer(ID_ONE,
137            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
138          successCount.incrementAndGet();
139        } catch (Exception e) {
140          LOG.debug("Got exception when add replication peer", e);
141        }
142      });
143      addPeers[i].start();
144    }
145    for (Thread addPeer : addPeers) {
146      addPeer.join();
147    }
148    assertEquals(1, successCount.get());
149
150    // Test concurrent remove peer operation
151    successCount.set(0);
152    Thread[] removePeers = new Thread[threadNum];
153    for (int i = 0; i < threadNum; i++) {
154      removePeers[i] = new Thread(() -> {
155        try {
156          hbaseAdmin.removeReplicationPeer(ID_ONE);
157          successCount.incrementAndGet();
158        } catch (Exception e) {
159          LOG.debug("Got exception when remove replication peer", e);
160        }
161      });
162      removePeers[i].start();
163    }
164    for (Thread removePeer : removePeers) {
165      removePeer.join();
166    }
167    assertEquals(1, successCount.get());
168
169    // Test concurrent add peer operation again
170    successCount.set(0);
171    addPeers = new Thread[threadNum];
172    for (int i = 0; i < threadNum; i++) {
173      addPeers[i] = new Thread(() -> {
174        try {
175          hbaseAdmin.addReplicationPeer(ID_ONE,
176            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
177          successCount.incrementAndGet();
178        } catch (Exception e) {
179          LOG.debug("Got exception when add replication peer", e);
180        }
181      });
182      addPeers[i].start();
183    }
184    for (Thread addPeer : addPeers) {
185      addPeer.join();
186    }
187    assertEquals(1, successCount.get());
188  }
189
190  @Test
191  public void testAddInvalidPeer() {
192    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
193    builder.setClusterKey(KEY_ONE);
194    try {
195      String invalidPeerId = "1-2";
196      hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build());
197      fail("Should fail as the peer id: " + invalidPeerId + " is invalid");
198    } catch (Exception e) {
199      // OK
200    }
201
202    try {
203      String invalidClusterKey = "2181:/hbase";
204      builder.setClusterKey(invalidClusterKey);
205      hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
206      fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid");
207    } catch (Exception e) {
208      // OK
209    }
210  }
211
212  /**
213   * Simple testing of adding and removing peers, basically shows that all interactions with ZK work
214   * n
215   */
216  @Test
217  public void testAddRemovePeer() throws Exception {
218    ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder();
219    rpc1.setClusterKey(KEY_ONE);
220    ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder();
221    rpc2.setClusterKey(KEY_SECOND);
222    // Add a valid peer
223    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
224    // try adding the same (fails)
225    try {
226      hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
227    } catch (Exception e) {
228      // OK!
229    }
230    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
231    // Try to remove an inexisting peer
232    try {
233      hbaseAdmin.removeReplicationPeer(ID_SECOND);
234      fail();
235    } catch (Exception e) {
236      // OK!
237    }
238    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
239    // Add a second since multi-slave is supported
240    try {
241      hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build());
242    } catch (Exception e) {
243      fail();
244    }
245    assertEquals(2, hbaseAdmin.listReplicationPeers().size());
246    // Remove the first peer we added
247    hbaseAdmin.removeReplicationPeer(ID_ONE);
248    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
249    hbaseAdmin.removeReplicationPeer(ID_SECOND);
250    assertEquals(0, hbaseAdmin.listReplicationPeers().size());
251  }
252
253  @Test
254  public void testAddPeerWithState() throws Exception {
255    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
256    rpc1.setClusterKey(KEY_ONE);
257    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
258    assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
259    hbaseAdmin.removeReplicationPeer(ID_ONE);
260
261    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
262    rpc2.setClusterKey(KEY_SECOND);
263    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
264    assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
265    hbaseAdmin.removeReplicationPeer(ID_SECOND);
266  }
267
268  /**
269   * Tests that the peer configuration used by ReplicationAdmin contains all the peer's properties.
270   */
271  @Test
272  public void testPeerConfig() throws Exception {
273    ReplicationPeerConfig config = new ReplicationPeerConfig();
274    config.setClusterKey(KEY_ONE);
275    config.getConfiguration().put("key1", "value1");
276    config.getConfiguration().put("key2", "value2");
277    hbaseAdmin.addReplicationPeer(ID_ONE, config);
278
279    List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
280    assertEquals(1, peers.size());
281    ReplicationPeerDescription peerOne = peers.get(0);
282    assertNotNull(peerOne);
283    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
284    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
285
286    hbaseAdmin.removeReplicationPeer(ID_ONE);
287  }
288
289  @Test
290  public void testAddPeerWithUnDeletedQueues() throws Exception {
291    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
292    rpc1.setClusterKey(KEY_ONE);
293    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
294    rpc2.setClusterKey(KEY_SECOND);
295    Configuration conf = TEST_UTIL.getConfiguration();
296    ReplicationQueueStorage queueStorage =
297      ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf);
298
299    ServerName serverName = ServerName.valueOf("server1", 8000, 1234);
300    // add queue for ID_ONE
301    queueStorage.addWAL(serverName, ID_ONE, "file1");
302    try {
303      admin.addPeer(ID_ONE, rpc1, null);
304      fail();
305    } catch (Exception e) {
306      // OK!
307    }
308    queueStorage.removeQueue(serverName, ID_ONE);
309    assertEquals(0, queueStorage.getAllQueues(serverName).size());
310
311    // add recovered queue for ID_ONE
312    queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1");
313    try {
314      admin.addPeer(ID_ONE, rpc2, null);
315      fail();
316    } catch (Exception e) {
317      // OK!
318    }
319  }
320
321  /**
322   * basic checks that when we add a peer that it is enabled, and that we can disable n
323   */
324  @Test
325  public void testEnableDisable() throws Exception {
326    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
327    rpc1.setClusterKey(KEY_ONE);
328    admin.addPeer(ID_ONE, rpc1, null);
329    assertEquals(1, admin.getPeersCount());
330    assertTrue(admin.getPeerState(ID_ONE));
331    admin.disablePeer(ID_ONE);
332
333    assertFalse(admin.getPeerState(ID_ONE));
334    try {
335      admin.getPeerState(ID_SECOND);
336    } catch (ReplicationPeerNotFoundException e) {
337      // OK!
338    }
339    admin.removePeer(ID_ONE);
340  }
341
342  @Test
343  public void testAppendPeerTableCFs() throws Exception {
344    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
345    rpc.setClusterKey(KEY_ONE);
346    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
347    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
348    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
349    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
350    final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5");
351    final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
352
353    // Add a valid peer
354    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
355
356    // Update peer config, not replicate all user tables
357    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
358    rpc.setReplicateAllUserTables(false);
359    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
360
361    Map<TableName, List<String>> tableCFs = new HashMap<>();
362    tableCFs.put(tableName1, null);
363    admin.appendPeerTableCFs(ID_ONE, tableCFs);
364    Map<TableName, List<String>> result =
365      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
366    assertEquals(1, result.size());
367    assertEquals(true, result.containsKey(tableName1));
368    assertNull(result.get(tableName1));
369
370    // append table t2 to replication
371    tableCFs.clear();
372    tableCFs.put(tableName2, null);
373    admin.appendPeerTableCFs(ID_ONE, tableCFs);
374    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
375    assertEquals(2, result.size());
376    assertTrue("Should contain t1", result.containsKey(tableName1));
377    assertTrue("Should contain t2", result.containsKey(tableName2));
378    assertNull(result.get(tableName1));
379    assertNull(result.get(tableName2));
380
381    // append table column family: f1 of t3 to replication
382    tableCFs.clear();
383    tableCFs.put(tableName3, new ArrayList<>());
384    tableCFs.get(tableName3).add("f1");
385    admin.appendPeerTableCFs(ID_ONE, tableCFs);
386    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
387    assertEquals(3, result.size());
388    assertTrue("Should contain t1", result.containsKey(tableName1));
389    assertTrue("Should contain t2", result.containsKey(tableName2));
390    assertTrue("Should contain t3", result.containsKey(tableName3));
391    assertNull(result.get(tableName1));
392    assertNull(result.get(tableName2));
393    assertEquals(1, result.get(tableName3).size());
394    assertEquals("f1", result.get(tableName3).get(0));
395
396    tableCFs.clear();
397    tableCFs.put(tableName4, new ArrayList<>());
398    tableCFs.get(tableName4).add("f1");
399    tableCFs.get(tableName4).add("f2");
400    admin.appendPeerTableCFs(ID_ONE, tableCFs);
401    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
402    assertEquals(4, result.size());
403    assertTrue("Should contain t1", result.containsKey(tableName1));
404    assertTrue("Should contain t2", result.containsKey(tableName2));
405    assertTrue("Should contain t3", result.containsKey(tableName3));
406    assertTrue("Should contain t4", result.containsKey(tableName4));
407    assertNull(result.get(tableName1));
408    assertNull(result.get(tableName2));
409    assertEquals(1, result.get(tableName3).size());
410    assertEquals("f1", result.get(tableName3).get(0));
411    assertEquals(2, result.get(tableName4).size());
412    assertEquals("f1", result.get(tableName4).get(0));
413    assertEquals("f2", result.get(tableName4).get(1));
414
415    // append "table5" => [], then append "table5" => ["f1"]
416    tableCFs.clear();
417    tableCFs.put(tableName5, new ArrayList<>());
418    admin.appendPeerTableCFs(ID_ONE, tableCFs);
419    tableCFs.clear();
420    tableCFs.put(tableName5, new ArrayList<>());
421    tableCFs.get(tableName5).add("f1");
422    admin.appendPeerTableCFs(ID_ONE, tableCFs);
423    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
424    assertEquals(5, result.size());
425    assertTrue("Should contain t5", result.containsKey(tableName5));
426    // null means replication all cfs of tab5
427    assertNull(result.get(tableName5));
428
429    // append "table6" => ["f1"], then append "table6" => []
430    tableCFs.clear();
431    tableCFs.put(tableName6, new ArrayList<>());
432    tableCFs.get(tableName6).add("f1");
433    admin.appendPeerTableCFs(ID_ONE, tableCFs);
434    tableCFs.clear();
435    tableCFs.put(tableName6, new ArrayList<>());
436    admin.appendPeerTableCFs(ID_ONE, tableCFs);
437    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
438    assertEquals(6, result.size());
439    assertTrue("Should contain t6", result.containsKey(tableName6));
440    // null means replication all cfs of tab6
441    assertNull(result.get(tableName6));
442
443    admin.removePeer(ID_ONE);
444  }
445
446  @Test
447  public void testRemovePeerTableCFs() throws Exception {
448    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
449    rpc.setClusterKey(KEY_ONE);
450    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
451    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
452    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
453    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
454
455    // Add a valid peer
456    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
457
458    // Update peer config, not replicate all user tables
459    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
460    rpc.setReplicateAllUserTables(false);
461    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
462
463    Map<TableName, List<String>> tableCFs = new HashMap<>();
464    try {
465      tableCFs.put(tableName3, null);
466      admin.removePeerTableCFs(ID_ONE, tableCFs);
467      assertTrue(false);
468    } catch (ReplicationException e) {
469    }
470    assertNull(admin.getPeerTableCFs(ID_ONE));
471
472    tableCFs.clear();
473    tableCFs.put(tableName1, null);
474    tableCFs.put(tableName2, new ArrayList<>());
475    tableCFs.get(tableName2).add("cf1");
476    admin.setPeerTableCFs(ID_ONE, tableCFs);
477    try {
478      tableCFs.clear();
479      tableCFs.put(tableName3, null);
480      admin.removePeerTableCFs(ID_ONE, tableCFs);
481      assertTrue(false);
482    } catch (ReplicationException e) {
483    }
484    Map<TableName, List<String>> result =
485      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
486    assertEquals(2, result.size());
487    assertTrue("Should contain t1", result.containsKey(tableName1));
488    assertTrue("Should contain t2", result.containsKey(tableName2));
489    assertNull(result.get(tableName1));
490    assertEquals(1, result.get(tableName2).size());
491    assertEquals("cf1", result.get(tableName2).get(0));
492
493    try {
494      tableCFs.clear();
495      tableCFs.put(tableName1, new ArrayList<>());
496      tableCFs.get(tableName1).add("f1");
497      admin.removePeerTableCFs(ID_ONE, tableCFs);
498      assertTrue(false);
499    } catch (ReplicationException e) {
500    }
501    tableCFs.clear();
502    tableCFs.put(tableName1, null);
503    admin.removePeerTableCFs(ID_ONE, tableCFs);
504    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
505    assertEquals(1, result.size());
506    assertEquals(1, result.get(tableName2).size());
507    assertEquals("cf1", result.get(tableName2).get(0));
508
509    try {
510      tableCFs.clear();
511      tableCFs.put(tableName2, null);
512      admin.removePeerTableCFs(ID_ONE, tableCFs);
513      fail();
514    } catch (ReplicationException e) {
515    }
516    tableCFs.clear();
517    tableCFs.put(tableName2, new ArrayList<>());
518    tableCFs.get(tableName2).add("cf1");
519    admin.removePeerTableCFs(ID_ONE, tableCFs);
520    assertNull(admin.getPeerTableCFs(ID_ONE));
521
522    tableCFs.clear();
523    tableCFs.put(tableName4, new ArrayList<>());
524    admin.setPeerTableCFs(ID_ONE, tableCFs);
525    admin.removePeerTableCFs(ID_ONE, tableCFs);
526    assertNull(admin.getPeerTableCFs(ID_ONE));
527
528    admin.removePeer(ID_ONE);
529  }
530
531  @Test
532  public void testSetPeerNamespaces() throws Exception {
533    String ns1 = "ns1";
534    String ns2 = "ns2";
535
536    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
537    rpc.setClusterKey(KEY_ONE);
538    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
539
540    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
541    rpc.setReplicateAllUserTables(false);
542    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
543
544    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
545    Set<String> namespaces = new HashSet<>();
546    namespaces.add(ns1);
547    namespaces.add(ns2);
548    rpc.setNamespaces(namespaces);
549    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
550    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
551    assertEquals(2, namespaces.size());
552    assertTrue(namespaces.contains(ns1));
553    assertTrue(namespaces.contains(ns2));
554
555    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
556    namespaces = new HashSet<>();
557    namespaces.add(ns1);
558    rpc.setNamespaces(namespaces);
559    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
560    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
561    assertEquals(1, namespaces.size());
562    assertTrue(namespaces.contains(ns1));
563
564    hbaseAdmin.removeReplicationPeer(ID_ONE);
565  }
566
567  @Test
568  public void testSetReplicateAllUserTables() throws Exception {
569    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
570    rpc.setClusterKey(KEY_ONE);
571    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
572
573    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
574    assertTrue(rpc.replicateAllUserTables());
575
576    rpc.setReplicateAllUserTables(false);
577    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
578    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
579    assertFalse(rpc.replicateAllUserTables());
580
581    rpc.setReplicateAllUserTables(true);
582    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
583    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
584    assertTrue(rpc.replicateAllUserTables());
585
586    hbaseAdmin.removeReplicationPeer(ID_ONE);
587  }
588
589  @Test
590  public void testPeerExcludeNamespaces() throws Exception {
591    String ns1 = "ns1";
592    String ns2 = "ns2";
593
594    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
595    rpc.setClusterKey(KEY_ONE);
596    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
597
598    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
599    assertTrue(rpc.replicateAllUserTables());
600
601    Set<String> namespaces = new HashSet<String>();
602    namespaces.add(ns1);
603    namespaces.add(ns2);
604    rpc.setExcludeNamespaces(namespaces);
605    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
606    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
607    assertEquals(2, namespaces.size());
608    assertTrue(namespaces.contains(ns1));
609    assertTrue(namespaces.contains(ns2));
610
611    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
612    namespaces = new HashSet<String>();
613    namespaces.add(ns1);
614    rpc.setExcludeNamespaces(namespaces);
615    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
616    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
617    assertEquals(1, namespaces.size());
618    assertTrue(namespaces.contains(ns1));
619
620    hbaseAdmin.removeReplicationPeer(ID_ONE);
621  }
622
623  @Test
624  public void testPeerExcludeTableCFs() throws Exception {
625    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
626    rpc.setClusterKey(KEY_ONE);
627    TableName tab1 = TableName.valueOf("t1");
628    TableName tab2 = TableName.valueOf("t2");
629    TableName tab3 = TableName.valueOf("t3");
630    TableName tab4 = TableName.valueOf("t4");
631
632    // Add a valid peer
633    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
634    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
635    assertTrue(rpc.replicateAllUserTables());
636
637    Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
638    tableCFs.put(tab1, null);
639    rpc.setExcludeTableCFsMap(tableCFs);
640    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
641    Map<TableName, List<String>> result =
642      hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
643    assertEquals(1, result.size());
644    assertEquals(true, result.containsKey(tab1));
645    assertNull(result.get(tab1));
646
647    tableCFs.put(tab2, new ArrayList<String>());
648    tableCFs.get(tab2).add("f1");
649    rpc.setExcludeTableCFsMap(tableCFs);
650    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
651    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
652    assertEquals(2, result.size());
653    assertTrue("Should contain t1", result.containsKey(tab1));
654    assertTrue("Should contain t2", result.containsKey(tab2));
655    assertNull(result.get(tab1));
656    assertEquals(1, result.get(tab2).size());
657    assertEquals("f1", result.get(tab2).get(0));
658
659    tableCFs.clear();
660    tableCFs.put(tab3, new ArrayList<String>());
661    tableCFs.put(tab4, new ArrayList<String>());
662    tableCFs.get(tab4).add("f1");
663    tableCFs.get(tab4).add("f2");
664    rpc.setExcludeTableCFsMap(tableCFs);
665    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
666    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
667    assertEquals(2, result.size());
668    assertTrue("Should contain t3", result.containsKey(tab3));
669    assertTrue("Should contain t4", result.containsKey(tab4));
670    assertNull(result.get(tab3));
671    assertEquals(2, result.get(tab4).size());
672    assertEquals("f1", result.get(tab4).get(0));
673    assertEquals("f2", result.get(tab4).get(1));
674
675    hbaseAdmin.removeReplicationPeer(ID_ONE);
676  }
677
678  @Test
679  public void testPeerConfigConflict() throws Exception {
680    // Default replicate_all flag is true
681    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
682    rpc.setClusterKey(KEY_ONE);
683
684    String ns1 = "ns1";
685    Set<String> namespaces = new HashSet<String>();
686    namespaces.add(ns1);
687
688    TableName tab1 = TableName.valueOf("ns2:tabl");
689    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
690    tableCfs.put(tab1, new ArrayList<String>());
691
692    try {
693      rpc.setNamespaces(namespaces);
694      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
695      fail("Should throw Exception."
696        + " When replicate all flag is true, no need to config namespaces");
697    } catch (IOException e) {
698      // OK
699      rpc.setNamespaces(null);
700    }
701
702    try {
703      rpc.setTableCFsMap(tableCfs);
704      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
705      fail("Should throw Exception."
706        + " When replicate all flag is true, no need to config table-cfs");
707    } catch (IOException e) {
708      // OK
709      rpc.setTableCFsMap(null);
710    }
711
712    // Set replicate_all flag to true
713    rpc.setReplicateAllUserTables(false);
714    try {
715      rpc.setExcludeNamespaces(namespaces);
716      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
717      fail("Should throw Exception."
718        + " When replicate all flag is false, no need to config exclude namespaces");
719    } catch (IOException e) {
720      // OK
721      rpc.setExcludeNamespaces(null);
722    }
723
724    try {
725      rpc.setExcludeTableCFsMap(tableCfs);
726      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
727      fail("Should throw Exception."
728        + " When replicate all flag is false, no need to config exclude table-cfs");
729    } catch (IOException e) {
730      // OK
731      rpc.setExcludeTableCFsMap(null);
732    }
733
734    rpc.setNamespaces(namespaces);
735    rpc.setTableCFsMap(tableCfs);
736    // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
737    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
738
739    // Default replicate_all flag is true
740    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
741    rpc2.setClusterKey(KEY_SECOND);
742    rpc2.setExcludeNamespaces(namespaces);
743    rpc2.setExcludeTableCFsMap(tableCfs);
744    // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
745    // table-cfs config
746    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
747
748    hbaseAdmin.removeReplicationPeer(ID_ONE);
749    hbaseAdmin.removeReplicationPeer(ID_SECOND);
750  }
751
752  @Test
753  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
754    String ns1 = "ns1";
755    String ns2 = "ns2";
756    final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName());
757    final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2");
758
759    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
760    rpc.setClusterKey(KEY_ONE);
761    rpc.setReplicateAllUserTables(false);
762    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
763
764    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
765    Set<String> namespaces = new HashSet<String>();
766    namespaces.add(ns1);
767    rpc.setNamespaces(namespaces);
768    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
769    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
770    try {
771      Map<TableName, List<String>> tableCfs = new HashMap<>();
772      tableCfs.put(tableName1, new ArrayList<>());
773      rpc.setTableCFsMap(tableCfs);
774      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
775      fail("Should throw ReplicationException" + " Because table " + tableName1
776        + " conflict with namespace " + ns1);
777    } catch (Exception e) {
778      // OK
779    }
780
781    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
782    Map<TableName, List<String>> tableCfs = new HashMap<>();
783    tableCfs.put(tableName2, new ArrayList<>());
784    rpc.setTableCFsMap(tableCfs);
785    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
786    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
787    try {
788      namespaces.clear();
789      namespaces.add(ns2);
790      rpc.setNamespaces(namespaces);
791      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
792      fail("Should throw ReplicationException" + " Because namespace " + ns2
793        + " conflict with table " + tableName2);
794    } catch (Exception e) {
795      // OK
796    }
797
798    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
799    rpc2.setClusterKey(KEY_SECOND);
800    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
801
802    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
803    Set<String> excludeNamespaces = new HashSet<String>();
804    excludeNamespaces.add(ns1);
805    rpc2.setExcludeNamespaces(excludeNamespaces);
806    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
807    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
808    try {
809      Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
810      excludeTableCfs.put(tableName1, new ArrayList<>());
811      rpc2.setExcludeTableCFsMap(excludeTableCfs);
812      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
813      fail("Should throw ReplicationException" + " Because exclude table " + tableName1
814        + " conflict with exclude namespace " + ns1);
815    } catch (Exception e) {
816      // OK
817    }
818
819    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
820    Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
821    excludeTableCfs.put(tableName2, new ArrayList<>());
822    rpc2.setExcludeTableCFsMap(excludeTableCfs);
823    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
824    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
825    try {
826      namespaces.clear();
827      namespaces.add(ns2);
828      rpc2.setNamespaces(namespaces);
829      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
830      fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
831        + " conflict with exclude table " + tableName2);
832    } catch (Exception e) {
833      // OK
834    }
835
836    hbaseAdmin.removeReplicationPeer(ID_ONE);
837    hbaseAdmin.removeReplicationPeer(ID_SECOND);
838  }
839
840  @Test
841  public void testPeerBandwidth() throws Exception {
842    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
843    rpc.setClusterKey(KEY_ONE);
844    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
845
846    rpc = admin.getPeerConfig(ID_ONE);
847    assertEquals(0, rpc.getBandwidth());
848
849    rpc.setBandwidth(2097152);
850    admin.updatePeerConfig(ID_ONE, rpc);
851
852    assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
853    admin.removePeer(ID_ONE);
854  }
855
856  @Test
857  public void testPeerClusterKey() throws Exception {
858    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
859    builder.setClusterKey(KEY_ONE);
860    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
861
862    try {
863      builder.setClusterKey(KEY_SECOND);
864      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
865      fail("Change cluster key on an existing peer is not allowed");
866    } catch (Exception e) {
867      // OK
868    }
869  }
870
871  @Test
872  public void testPeerReplicationEndpointImpl() throws Exception {
873    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
874    builder.setClusterKey(KEY_ONE);
875    builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
876    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
877
878    try {
879      builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName());
880      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
881      fail("Change replication endpoint implementation class on an existing peer is not allowed");
882    } catch (Exception e) {
883      // OK
884    }
885
886    try {
887      builder = ReplicationPeerConfig.newBuilder();
888      builder.setClusterKey(KEY_ONE);
889      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
890      fail("Change replication endpoint implementation class on an existing peer is not allowed");
891    } catch (Exception e) {
892      // OK
893    }
894
895    builder = ReplicationPeerConfig.newBuilder();
896    builder.setClusterKey(KEY_SECOND);
897    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
898
899    try {
900      builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
901      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
902      fail("Change replication endpoint implementation class on an existing peer is not allowed");
903    } catch (Exception e) {
904      // OK
905    }
906  }
907}