001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.regex.Pattern;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
045import org.apache.hadoop.hbase.replication.ReplicationException;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
047import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
051import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
052import org.apache.hadoop.hbase.testclassification.ClientTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.junit.After;
055import org.junit.AfterClass;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Unit testing of ReplicationAdmin
067 */
068@Category({MediumTests.class, ClientTests.class})
069public class TestReplicationAdmin {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestReplicationAdmin.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class);
076
077  private final static HBaseTestingUtility TEST_UTIL =
078      new HBaseTestingUtility();
079
080  private final String ID_ONE = "1";
081  private static String KEY_ONE;
082  private final String ID_SECOND = "2";
083  private static String KEY_SECOND;
084
085  private static ReplicationAdmin admin;
086  private static Admin hbaseAdmin;
087
088  @Rule
089  public TestName name = new TestName();
090
091  /**
092   * @throws java.lang.Exception
093   */
094  @BeforeClass
095  public static void setUpBeforeClass() throws Exception {
096    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
097    TEST_UTIL.startMiniCluster();
098    admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
099    hbaseAdmin = TEST_UTIL.getAdmin();
100    KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
101    KEY_SECOND = TEST_UTIL.getClusterKey() + "-test2";
102  }
103
104  @AfterClass
105  public static void tearDownAfterClass() throws Exception {
106    if (admin != null) {
107      admin.close();
108    }
109    TEST_UTIL.shutdownMiniCluster();
110  }
111
112  @After
113  public void tearDown() throws Exception {
114    for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) {
115      hbaseAdmin.removeReplicationPeer(desc.getPeerId());
116    }
117    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
118        .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
119    for (ServerName serverName : queueStorage.getListOfReplicators()) {
120      for (String queue : queueStorage.getAllQueues(serverName)) {
121        queueStorage.removeQueue(serverName, queue);
122      }
123      queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
124    }
125  }
126
127  @Test
128  public void testConcurrentPeerOperations() throws Exception {
129    int threadNum = 5;
130    AtomicLong successCount = new AtomicLong(0);
131
132    // Test concurrent add peer operation
133    Thread[] addPeers = new Thread[threadNum];
134    for (int i = 0; i < threadNum; i++) {
135      addPeers[i] = new Thread(() -> {
136        try {
137          hbaseAdmin.addReplicationPeer(ID_ONE,
138            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
139          successCount.incrementAndGet();
140        } catch (Exception e) {
141          LOG.debug("Got exception when add replication peer", e);
142        }
143      });
144      addPeers[i].start();
145    }
146    for (Thread addPeer : addPeers) {
147      addPeer.join();
148    }
149    assertEquals(1, successCount.get());
150
151    // Test concurrent remove peer operation
152    successCount.set(0);
153    Thread[] removePeers = new Thread[threadNum];
154    for (int i = 0; i < threadNum; i++) {
155      removePeers[i] = new Thread(() -> {
156        try {
157          hbaseAdmin.removeReplicationPeer(ID_ONE);
158          successCount.incrementAndGet();
159        } catch (Exception e) {
160          LOG.debug("Got exception when remove replication peer", e);
161        }
162      });
163      removePeers[i].start();
164    }
165    for (Thread removePeer : removePeers) {
166      removePeer.join();
167    }
168    assertEquals(1, successCount.get());
169
170    // Test concurrent add peer operation again
171    successCount.set(0);
172    addPeers = new Thread[threadNum];
173    for (int i = 0; i < threadNum; i++) {
174      addPeers[i] = new Thread(() -> {
175        try {
176          hbaseAdmin.addReplicationPeer(ID_ONE,
177            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
178          successCount.incrementAndGet();
179        } catch (Exception e) {
180          LOG.debug("Got exception when add replication peer", e);
181        }
182      });
183      addPeers[i].start();
184    }
185    for (Thread addPeer : addPeers) {
186      addPeer.join();
187    }
188    assertEquals(1, successCount.get());
189  }
190
191  @Test
192  public void testAddInvalidPeer() {
193    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
194    builder.setClusterKey(KEY_ONE);
195    try {
196      String invalidPeerId = "1-2";
197      hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build());
198      fail("Should fail as the peer id: " + invalidPeerId + " is invalid");
199    } catch (Exception e) {
200      // OK
201    }
202
203    try {
204      String invalidClusterKey = "2181:/hbase";
205      builder.setClusterKey(invalidClusterKey);
206      hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
207      fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid");
208    } catch (Exception e) {
209      // OK
210    }
211  }
212
213  /**
214   * Simple testing of adding and removing peers, basically shows that
215   * all interactions with ZK work
216   * @throws Exception
217   */
218  @Test
219  public void testAddRemovePeer() throws Exception {
220    ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder();
221    rpc1.setClusterKey(KEY_ONE);
222    ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder();
223    rpc2.setClusterKey(KEY_SECOND);
224    // Add a valid peer
225    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
226    // try adding the same (fails)
227    try {
228      hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
229    } catch (Exception e) {
230      // OK!
231    }
232    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
233    // Try to remove an inexisting peer
234    try {
235      hbaseAdmin.removeReplicationPeer(ID_SECOND);
236      fail();
237    } catch (Exception e) {
238      // OK!
239    }
240    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
241    // Add a second since multi-slave is supported
242    try {
243      hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build());
244    } catch (Exception e) {
245      fail();
246    }
247    assertEquals(2, hbaseAdmin.listReplicationPeers().size());
248    // Remove the first peer we added
249    hbaseAdmin.removeReplicationPeer(ID_ONE);
250    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
251    hbaseAdmin.removeReplicationPeer(ID_SECOND);
252    assertEquals(0, hbaseAdmin.listReplicationPeers().size());
253  }
254
255  @Test
256  public void testAddPeerWithState() throws Exception {
257    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
258    rpc1.setClusterKey(KEY_ONE);
259    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
260    assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
261    hbaseAdmin.removeReplicationPeer(ID_ONE);
262
263    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
264    rpc2.setClusterKey(KEY_SECOND);
265    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
266    assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
267    hbaseAdmin.removeReplicationPeer(ID_SECOND);
268  }
269
270  /**
271   * Tests that the peer configuration used by ReplicationAdmin contains all
272   * the peer's properties.
273   */
274  @Test
275  public void testPeerConfig() throws Exception {
276    ReplicationPeerConfig config = new ReplicationPeerConfig();
277    config.setClusterKey(KEY_ONE);
278    config.getConfiguration().put("key1", "value1");
279    config.getConfiguration().put("key2", "value2");
280    hbaseAdmin.addReplicationPeer(ID_ONE, config);
281
282    List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
283    assertEquals(1, peers.size());
284    ReplicationPeerDescription peerOne = peers.get(0);
285    assertNotNull(peerOne);
286    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
287    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
288
289    hbaseAdmin.removeReplicationPeer(ID_ONE);
290  }
291
292  @Test
293  public void testAddPeerWithUnDeletedQueues() throws Exception {
294    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
295    rpc1.setClusterKey(KEY_ONE);
296    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
297    rpc2.setClusterKey(KEY_SECOND);
298    Configuration conf = TEST_UTIL.getConfiguration();
299    ReplicationQueueStorage queueStorage =
300      ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf);
301
302    ServerName serverName = ServerName.valueOf("server1", 8000, 1234);
303    // add queue for ID_ONE
304    queueStorage.addWAL(serverName, ID_ONE, "file1");
305    try {
306      admin.addPeer(ID_ONE, rpc1, null);
307      fail();
308    } catch (Exception e) {
309      // OK!
310    }
311    queueStorage.removeQueue(serverName, ID_ONE);
312    assertEquals(0, queueStorage.getAllQueues(serverName).size());
313
314    // add recovered queue for ID_ONE
315    queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1");
316    try {
317      admin.addPeer(ID_ONE, rpc2, null);
318      fail();
319    } catch (Exception e) {
320      // OK!
321    }
322  }
323
324  /**
325   * basic checks that when we add a peer that it is enabled, and that we can disable
326   * @throws Exception
327   */
328  @Test
329  public void testEnableDisable() throws Exception {
330    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
331    rpc1.setClusterKey(KEY_ONE);
332    admin.addPeer(ID_ONE, rpc1, null);
333    assertEquals(1, admin.getPeersCount());
334    assertTrue(admin.getPeerState(ID_ONE));
335    admin.disablePeer(ID_ONE);
336
337    assertFalse(admin.getPeerState(ID_ONE));
338    try {
339      admin.getPeerState(ID_SECOND);
340    } catch (ReplicationPeerNotFoundException e) {
341      // OK!
342    }
343    admin.removePeer(ID_ONE);
344  }
345
346  @Test
347  public void testAppendPeerTableCFs() throws Exception {
348    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
349    rpc.setClusterKey(KEY_ONE);
350    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
351    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
352    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
353    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
354    final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5");
355    final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
356
357    // Add a valid peer
358    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
359
360    // Update peer config, not replicate all user tables
361    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
362    rpc.setReplicateAllUserTables(false);
363    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
364
365    Map<TableName, List<String>> tableCFs = new HashMap<>();
366    tableCFs.put(tableName1, null);
367    admin.appendPeerTableCFs(ID_ONE, tableCFs);
368    Map<TableName, List<String>> result =
369      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
370    assertEquals(1, result.size());
371    assertEquals(true, result.containsKey(tableName1));
372    assertNull(result.get(tableName1));
373
374    // append table t2 to replication
375    tableCFs.clear();
376    tableCFs.put(tableName2, null);
377    admin.appendPeerTableCFs(ID_ONE, tableCFs);
378    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
379    assertEquals(2, result.size());
380    assertTrue("Should contain t1", result.containsKey(tableName1));
381    assertTrue("Should contain t2", result.containsKey(tableName2));
382    assertNull(result.get(tableName1));
383    assertNull(result.get(tableName2));
384
385    // append table column family: f1 of t3 to replication
386    tableCFs.clear();
387    tableCFs.put(tableName3, new ArrayList<>());
388    tableCFs.get(tableName3).add("f1");
389    admin.appendPeerTableCFs(ID_ONE, tableCFs);
390    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
391    assertEquals(3, result.size());
392    assertTrue("Should contain t1", result.containsKey(tableName1));
393    assertTrue("Should contain t2", result.containsKey(tableName2));
394    assertTrue("Should contain t3", result.containsKey(tableName3));
395    assertNull(result.get(tableName1));
396    assertNull(result.get(tableName2));
397    assertEquals(1, result.get(tableName3).size());
398    assertEquals("f1", result.get(tableName3).get(0));
399
400    tableCFs.clear();
401    tableCFs.put(tableName4, new ArrayList<>());
402    tableCFs.get(tableName4).add("f1");
403    tableCFs.get(tableName4).add("f2");
404    admin.appendPeerTableCFs(ID_ONE, tableCFs);
405    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
406    assertEquals(4, result.size());
407    assertTrue("Should contain t1", result.containsKey(tableName1));
408    assertTrue("Should contain t2", result.containsKey(tableName2));
409    assertTrue("Should contain t3", result.containsKey(tableName3));
410    assertTrue("Should contain t4", result.containsKey(tableName4));
411    assertNull(result.get(tableName1));
412    assertNull(result.get(tableName2));
413    assertEquals(1, result.get(tableName3).size());
414    assertEquals("f1", result.get(tableName3).get(0));
415    assertEquals(2, result.get(tableName4).size());
416    assertEquals("f1", result.get(tableName4).get(0));
417    assertEquals("f2", result.get(tableName4).get(1));
418
419    // append "table5" => [], then append "table5" => ["f1"]
420    tableCFs.clear();
421    tableCFs.put(tableName5, new ArrayList<>());
422    admin.appendPeerTableCFs(ID_ONE, tableCFs);
423    tableCFs.clear();
424    tableCFs.put(tableName5, new ArrayList<>());
425    tableCFs.get(tableName5).add("f1");
426    admin.appendPeerTableCFs(ID_ONE, tableCFs);
427    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
428    assertEquals(5, result.size());
429    assertTrue("Should contain t5", result.containsKey(tableName5));
430    // null means replication all cfs of tab5
431    assertNull(result.get(tableName5));
432
433    // append "table6" => ["f1"], then append "table6" => []
434    tableCFs.clear();
435    tableCFs.put(tableName6, new ArrayList<>());
436    tableCFs.get(tableName6).add("f1");
437    admin.appendPeerTableCFs(ID_ONE, tableCFs);
438    tableCFs.clear();
439    tableCFs.put(tableName6, new ArrayList<>());
440    admin.appendPeerTableCFs(ID_ONE, tableCFs);
441    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
442    assertEquals(6, result.size());
443    assertTrue("Should contain t6", result.containsKey(tableName6));
444    // null means replication all cfs of tab6
445    assertNull(result.get(tableName6));
446
447    admin.removePeer(ID_ONE);
448  }
449
450  @Test
451  public void testRemovePeerTableCFs() throws Exception {
452    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
453    rpc.setClusterKey(KEY_ONE);
454    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
455    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
456    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
457    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
458
459    // Add a valid peer
460    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
461
462    // Update peer config, not replicate all user tables
463    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
464    rpc.setReplicateAllUserTables(false);
465    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
466
467    Map<TableName, List<String>> tableCFs = new HashMap<>();
468    try {
469      tableCFs.put(tableName3, null);
470      admin.removePeerTableCFs(ID_ONE, tableCFs);
471      assertTrue(false);
472    } catch (ReplicationException e) {
473    }
474    assertNull(admin.getPeerTableCFs(ID_ONE));
475
476    tableCFs.clear();
477    tableCFs.put(tableName1, null);
478    tableCFs.put(tableName2, new ArrayList<>());
479    tableCFs.get(tableName2).add("cf1");
480    admin.setPeerTableCFs(ID_ONE, tableCFs);
481    try {
482      tableCFs.clear();
483      tableCFs.put(tableName3, null);
484      admin.removePeerTableCFs(ID_ONE, tableCFs);
485      assertTrue(false);
486    } catch (ReplicationException e) {
487    }
488    Map<TableName, List<String>> result =
489      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
490    assertEquals(2, result.size());
491    assertTrue("Should contain t1", result.containsKey(tableName1));
492    assertTrue("Should contain t2", result.containsKey(tableName2));
493    assertNull(result.get(tableName1));
494    assertEquals(1, result.get(tableName2).size());
495    assertEquals("cf1", result.get(tableName2).get(0));
496
497    try {
498      tableCFs.clear();
499      tableCFs.put(tableName1, new ArrayList<>());
500      tableCFs.get(tableName1).add("f1");
501      admin.removePeerTableCFs(ID_ONE, tableCFs);
502      assertTrue(false);
503    } catch (ReplicationException e) {
504    }
505    tableCFs.clear();
506    tableCFs.put(tableName1, null);
507    admin.removePeerTableCFs(ID_ONE, tableCFs);
508    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
509    assertEquals(1, result.size());
510    assertEquals(1, result.get(tableName2).size());
511    assertEquals("cf1", result.get(tableName2).get(0));
512
513    try {
514      tableCFs.clear();
515      tableCFs.put(tableName2, null);
516      admin.removePeerTableCFs(ID_ONE, tableCFs);
517      fail();
518    } catch (ReplicationException e) {
519    }
520    tableCFs.clear();
521    tableCFs.put(tableName2, new ArrayList<>());
522    tableCFs.get(tableName2).add("cf1");
523    admin.removePeerTableCFs(ID_ONE, tableCFs);
524    assertNull(admin.getPeerTableCFs(ID_ONE));
525
526    tableCFs.clear();
527    tableCFs.put(tableName4, new ArrayList<>());
528    admin.setPeerTableCFs(ID_ONE, tableCFs);
529    admin.removePeerTableCFs(ID_ONE, tableCFs);
530    assertNull(admin.getPeerTableCFs(ID_ONE));
531
532    admin.removePeer(ID_ONE);
533  }
534
535  @Test
536  public void testSetPeerNamespaces() throws Exception {
537    String ns1 = "ns1";
538    String ns2 = "ns2";
539
540    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
541    rpc.setClusterKey(KEY_ONE);
542    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
543
544    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
545    rpc.setReplicateAllUserTables(false);
546    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
547
548    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
549    Set<String> namespaces = new HashSet<>();
550    namespaces.add(ns1);
551    namespaces.add(ns2);
552    rpc.setNamespaces(namespaces);
553    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
554    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
555    assertEquals(2, namespaces.size());
556    assertTrue(namespaces.contains(ns1));
557    assertTrue(namespaces.contains(ns2));
558
559    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
560    namespaces = new HashSet<>();
561    namespaces.add(ns1);
562    rpc.setNamespaces(namespaces);
563    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
564    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
565    assertEquals(1, namespaces.size());
566    assertTrue(namespaces.contains(ns1));
567
568    hbaseAdmin.removeReplicationPeer(ID_ONE);
569  }
570
571  @Test
572  public void testSetReplicateAllUserTables() throws Exception {
573    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
574    rpc.setClusterKey(KEY_ONE);
575    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
576
577    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
578    assertTrue(rpc.replicateAllUserTables());
579
580    rpc.setReplicateAllUserTables(false);
581    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
582    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
583    assertFalse(rpc.replicateAllUserTables());
584
585    rpc.setReplicateAllUserTables(true);
586    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
587    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
588    assertTrue(rpc.replicateAllUserTables());
589
590    hbaseAdmin.removeReplicationPeer(ID_ONE);
591  }
592
593  @Test
594  public void testPeerExcludeNamespaces() throws Exception {
595    String ns1 = "ns1";
596    String ns2 = "ns2";
597
598    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
599    rpc.setClusterKey(KEY_ONE);
600    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
601
602    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
603    assertTrue(rpc.replicateAllUserTables());
604
605    Set<String> namespaces = new HashSet<String>();
606    namespaces.add(ns1);
607    namespaces.add(ns2);
608    rpc.setExcludeNamespaces(namespaces);
609    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
610    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
611    assertEquals(2, namespaces.size());
612    assertTrue(namespaces.contains(ns1));
613    assertTrue(namespaces.contains(ns2));
614
615    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
616    namespaces = new HashSet<String>();
617    namespaces.add(ns1);
618    rpc.setExcludeNamespaces(namespaces);
619    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
620    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
621    assertEquals(1, namespaces.size());
622    assertTrue(namespaces.contains(ns1));
623
624    hbaseAdmin.removeReplicationPeer(ID_ONE);
625  }
626
627  @Test
628  public void testPeerExcludeTableCFs() throws Exception {
629    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
630    rpc.setClusterKey(KEY_ONE);
631    TableName tab1 = TableName.valueOf("t1");
632    TableName tab2 = TableName.valueOf("t2");
633    TableName tab3 = TableName.valueOf("t3");
634    TableName tab4 = TableName.valueOf("t4");
635
636    // Add a valid peer
637    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
638    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
639    assertTrue(rpc.replicateAllUserTables());
640
641    Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
642    tableCFs.put(tab1, null);
643    rpc.setExcludeTableCFsMap(tableCFs);
644    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
645    Map<TableName, List<String>> result =
646        hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
647    assertEquals(1, result.size());
648    assertEquals(true, result.containsKey(tab1));
649    assertNull(result.get(tab1));
650
651    tableCFs.put(tab2, new ArrayList<String>());
652    tableCFs.get(tab2).add("f1");
653    rpc.setExcludeTableCFsMap(tableCFs);
654    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
655    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
656    assertEquals(2, result.size());
657    assertTrue("Should contain t1", result.containsKey(tab1));
658    assertTrue("Should contain t2", result.containsKey(tab2));
659    assertNull(result.get(tab1));
660    assertEquals(1, result.get(tab2).size());
661    assertEquals("f1", result.get(tab2).get(0));
662
663    tableCFs.clear();
664    tableCFs.put(tab3, new ArrayList<String>());
665    tableCFs.put(tab4, new ArrayList<String>());
666    tableCFs.get(tab4).add("f1");
667    tableCFs.get(tab4).add("f2");
668    rpc.setExcludeTableCFsMap(tableCFs);
669    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
670    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
671    assertEquals(2, result.size());
672    assertTrue("Should contain t3", result.containsKey(tab3));
673    assertTrue("Should contain t4", result.containsKey(tab4));
674    assertNull(result.get(tab3));
675    assertEquals(2, result.get(tab4).size());
676    assertEquals("f1", result.get(tab4).get(0));
677    assertEquals("f2", result.get(tab4).get(1));
678
679    hbaseAdmin.removeReplicationPeer(ID_ONE);
680  }
681
682  @Test
683  public void testPeerConfigConflict() throws Exception {
684    // Default replicate_all flag is true
685    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
686    rpc.setClusterKey(KEY_ONE);
687
688    String ns1 = "ns1";
689    Set<String> namespaces = new HashSet<String>();
690    namespaces.add(ns1);
691
692    TableName tab1 = TableName.valueOf("ns2:tabl");
693    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
694    tableCfs.put(tab1, new ArrayList<String>());
695
696    try {
697      rpc.setNamespaces(namespaces);
698      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
699      fail("Should throw Exception."
700          + " When replicate all flag is true, no need to config namespaces");
701    } catch (IOException e) {
702      // OK
703      rpc.setNamespaces(null);
704    }
705
706    try {
707      rpc.setTableCFsMap(tableCfs);
708      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
709      fail("Should throw Exception."
710          + " When replicate all flag is true, no need to config table-cfs");
711    } catch (IOException e) {
712      // OK
713      rpc.setTableCFsMap(null);
714    }
715
716    // Set replicate_all flag to true
717    rpc.setReplicateAllUserTables(false);
718    try {
719      rpc.setExcludeNamespaces(namespaces);
720      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
721      fail("Should throw Exception."
722          + " When replicate all flag is false, no need to config exclude namespaces");
723    } catch (IOException e) {
724      // OK
725      rpc.setExcludeNamespaces(null);
726    }
727
728    try {
729      rpc.setExcludeTableCFsMap(tableCfs);
730      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
731      fail("Should throw Exception."
732          + " When replicate all flag is false, no need to config exclude table-cfs");
733    } catch (IOException e) {
734      // OK
735      rpc.setExcludeTableCFsMap(null);
736    }
737
738    rpc.setNamespaces(namespaces);
739    rpc.setTableCFsMap(tableCfs);
740    // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
741    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
742
743    // Default replicate_all flag is true
744    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
745    rpc2.setClusterKey(KEY_SECOND);
746    rpc2.setExcludeNamespaces(namespaces);
747    rpc2.setExcludeTableCFsMap(tableCfs);
748    // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
749    // table-cfs config
750    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
751
752    hbaseAdmin.removeReplicationPeer(ID_ONE);
753    hbaseAdmin.removeReplicationPeer(ID_SECOND);
754  }
755
756  @Test
757  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
758    String ns1 = "ns1";
759    String ns2 = "ns2";
760    final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName());
761    final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2");
762
763    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
764    rpc.setClusterKey(KEY_ONE);
765    rpc.setReplicateAllUserTables(false);
766    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
767
768    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
769    Set<String> namespaces = new HashSet<String>();
770    namespaces.add(ns1);
771    rpc.setNamespaces(namespaces);
772    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
773    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
774    try {
775      Map<TableName, List<String>> tableCfs = new HashMap<>();
776      tableCfs.put(tableName1, new ArrayList<>());
777      rpc.setTableCFsMap(tableCfs);
778      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
779      fail("Should throw ReplicationException" + " Because table " + tableName1
780          + " conflict with namespace " + ns1);
781    } catch (Exception e) {
782      // OK
783    }
784
785    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
786    Map<TableName, List<String>> tableCfs = new HashMap<>();
787    tableCfs.put(tableName2, new ArrayList<>());
788    rpc.setTableCFsMap(tableCfs);
789    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
790    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
791    try {
792      namespaces.clear();
793      namespaces.add(ns2);
794      rpc.setNamespaces(namespaces);
795      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
796      fail("Should throw ReplicationException" + " Because namespace " + ns2
797          + " conflict with table " + tableName2);
798    } catch (Exception e) {
799      // OK
800    }
801
802    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
803    rpc2.setClusterKey(KEY_SECOND);
804    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
805
806    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
807    Set<String> excludeNamespaces = new HashSet<String>();
808    excludeNamespaces.add(ns1);
809    rpc2.setExcludeNamespaces(excludeNamespaces);
810    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
811    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
812    try {
813      Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
814      excludeTableCfs.put(tableName1, new ArrayList<>());
815      rpc2.setExcludeTableCFsMap(excludeTableCfs);
816      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
817      fail("Should throw ReplicationException" + " Because exclude table " + tableName1
818          + " conflict with exclude namespace " + ns1);
819    } catch (Exception e) {
820      // OK
821    }
822
823    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
824    Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
825    excludeTableCfs.put(tableName2, new ArrayList<>());
826    rpc2.setExcludeTableCFsMap(excludeTableCfs);
827    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
828    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
829    try {
830      namespaces.clear();
831      namespaces.add(ns2);
832      rpc2.setNamespaces(namespaces);
833      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
834      fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
835          + " conflict with exclude table " + tableName2);
836    } catch (Exception e) {
837      // OK
838    }
839
840    hbaseAdmin.removeReplicationPeer(ID_ONE);
841    hbaseAdmin.removeReplicationPeer(ID_SECOND);
842  }
843
844  @Test
845  public void testPeerBandwidth() throws Exception {
846    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
847    rpc.setClusterKey(KEY_ONE);
848    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
849
850    rpc = admin.getPeerConfig(ID_ONE);
851    assertEquals(0, rpc.getBandwidth());
852
853    rpc.setBandwidth(2097152);
854    admin.updatePeerConfig(ID_ONE, rpc);
855
856    assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
857    admin.removePeer(ID_ONE);
858  }
859
860  @Test
861  public void testPeerClusterKey() throws Exception {
862    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
863    builder.setClusterKey(KEY_ONE);
864    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
865
866    try {
867      builder.setClusterKey(KEY_SECOND);
868      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
869      fail("Change cluster key on an existing peer is not allowed");
870    } catch (Exception e) {
871      // OK
872    }
873  }
874
875  @Test
876  public void testPeerReplicationEndpointImpl() throws Exception {
877    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
878    builder.setClusterKey(KEY_ONE);
879    builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
880    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
881
882    try {
883      builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName());
884      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
885      fail("Change replication endpoint implementation class on an existing peer is not allowed");
886    } catch (Exception e) {
887      // OK
888    }
889
890    try {
891      builder = ReplicationPeerConfig.newBuilder();
892      builder.setClusterKey(KEY_ONE);
893      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
894      fail("Change replication endpoint implementation class on an existing peer is not allowed");
895    } catch (Exception e) {
896      // OK
897    }
898
899    builder = ReplicationPeerConfig.newBuilder();
900    builder.setClusterKey(KEY_SECOND);
901    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
902
903    try {
904      builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
905      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
906      fail("Change replication endpoint implementation class on an existing peer is not allowed");
907    } catch (Exception e) {
908      // OK
909    }
910  }
911}