001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034import java.util.concurrent.atomic.AtomicLong;
035import java.util.regex.Pattern;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
045import org.apache.hadoop.hbase.replication.ReplicationException;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
047import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
051import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
052import org.apache.hadoop.hbase.testclassification.ClientTests;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.junit.After;
055import org.junit.AfterClass;
056import org.junit.BeforeClass;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * Unit testing of ReplicationAdmin
067 */
068@Category({MediumTests.class, ClientTests.class})
069public class TestReplicationAdmin {
070
071  @ClassRule
072  public static final HBaseClassTestRule CLASS_RULE =
073      HBaseClassTestRule.forClass(TestReplicationAdmin.class);
074
075  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class);
076
077  private final static HBaseTestingUtility TEST_UTIL =
078      new HBaseTestingUtility();
079
080  private final String ID_ONE = "1";
081  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
082  private final String ID_SECOND = "2";
083  private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
084
085  private static ReplicationAdmin admin;
086  private static Admin hbaseAdmin;
087
088  @Rule
089  public TestName name = new TestName();
090
091  /**
092   * @throws java.lang.Exception
093   */
094  @BeforeClass
095  public static void setUpBeforeClass() throws Exception {
096    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
097    TEST_UTIL.startMiniCluster();
098    admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
099    hbaseAdmin = TEST_UTIL.getAdmin();
100  }
101
102  @AfterClass
103  public static void tearDownAfterClass() throws Exception {
104    if (admin != null) {
105      admin.close();
106    }
107    TEST_UTIL.shutdownMiniCluster();
108  }
109
110  @After
111  public void tearDown() throws Exception {
112    for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) {
113      hbaseAdmin.removeReplicationPeer(desc.getPeerId());
114    }
115    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
116        .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
117    for (ServerName serverName : queueStorage.getListOfReplicators()) {
118      for (String queue : queueStorage.getAllQueues(serverName)) {
119        queueStorage.removeQueue(serverName, queue);
120      }
121      queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
122    }
123  }
124
125  @Test
126  public void testConcurrentPeerOperations() throws Exception {
127    int threadNum = 5;
128    AtomicLong successCount = new AtomicLong(0);
129
130    // Test concurrent add peer operation
131    Thread[] addPeers = new Thread[threadNum];
132    for (int i = 0; i < threadNum; i++) {
133      addPeers[i] = new Thread(() -> {
134        try {
135          hbaseAdmin.addReplicationPeer(ID_ONE,
136            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
137          successCount.incrementAndGet();
138        } catch (Exception e) {
139          LOG.debug("Got exception when add replication peer", e);
140        }
141      });
142      addPeers[i].start();
143    }
144    for (Thread addPeer : addPeers) {
145      addPeer.join();
146    }
147    assertEquals(1, successCount.get());
148
149    // Test concurrent remove peer operation
150    successCount.set(0);
151    Thread[] removePeers = new Thread[threadNum];
152    for (int i = 0; i < threadNum; i++) {
153      removePeers[i] = new Thread(() -> {
154        try {
155          hbaseAdmin.removeReplicationPeer(ID_ONE);
156          successCount.incrementAndGet();
157        } catch (Exception e) {
158          LOG.debug("Got exception when remove replication peer", e);
159        }
160      });
161      removePeers[i].start();
162    }
163    for (Thread removePeer : removePeers) {
164      removePeer.join();
165    }
166    assertEquals(1, successCount.get());
167
168    // Test concurrent add peer operation again
169    successCount.set(0);
170    addPeers = new Thread[threadNum];
171    for (int i = 0; i < threadNum; i++) {
172      addPeers[i] = new Thread(() -> {
173        try {
174          hbaseAdmin.addReplicationPeer(ID_ONE,
175            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
176          successCount.incrementAndGet();
177        } catch (Exception e) {
178          LOG.debug("Got exception when add replication peer", e);
179        }
180      });
181      addPeers[i].start();
182    }
183    for (Thread addPeer : addPeers) {
184      addPeer.join();
185    }
186    assertEquals(1, successCount.get());
187  }
188
189  @Test
190  public void testAddInvalidPeer() {
191    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
192    builder.setClusterKey(KEY_ONE);
193    try {
194      String invalidPeerId = "1-2";
195      hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build());
196      fail("Should fail as the peer id: " + invalidPeerId + " is invalid");
197    } catch (Exception e) {
198      // OK
199    }
200
201    try {
202      String invalidClusterKey = "2181:/hbase";
203      builder.setClusterKey(invalidClusterKey);
204      hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
205      fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid");
206    } catch (Exception e) {
207      // OK
208    }
209  }
210
211  /**
212   * Simple testing of adding and removing peers, basically shows that
213   * all interactions with ZK work
214   * @throws Exception
215   */
216  @Test
217  public void testAddRemovePeer() throws Exception {
218    ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder();
219    rpc1.setClusterKey(KEY_ONE);
220    ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder();
221    rpc2.setClusterKey(KEY_SECOND);
222    // Add a valid peer
223    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
224    // try adding the same (fails)
225    try {
226      hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
227    } catch (Exception e) {
228      // OK!
229    }
230    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
231    // Try to remove an inexisting peer
232    try {
233      hbaseAdmin.removeReplicationPeer(ID_SECOND);
234      fail();
235    } catch (Exception e) {
236      // OK!
237    }
238    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
239    // Add a second since multi-slave is supported
240    try {
241      hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build());
242    } catch (Exception e) {
243      fail();
244    }
245    assertEquals(2, hbaseAdmin.listReplicationPeers().size());
246    // Remove the first peer we added
247    hbaseAdmin.removeReplicationPeer(ID_ONE);
248    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
249    hbaseAdmin.removeReplicationPeer(ID_SECOND);
250    assertEquals(0, hbaseAdmin.listReplicationPeers().size());
251  }
252
253  @Test
254  public void testAddPeerWithState() throws Exception {
255    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
256    rpc1.setClusterKey(KEY_ONE);
257    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
258    assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
259    hbaseAdmin.removeReplicationPeer(ID_ONE);
260
261    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
262    rpc2.setClusterKey(KEY_SECOND);
263    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
264    assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
265    hbaseAdmin.removeReplicationPeer(ID_SECOND);
266  }
267
268  /**
269   * Tests that the peer configuration used by ReplicationAdmin contains all
270   * the peer's properties.
271   */
272  @Test
273  public void testPeerConfig() throws Exception {
274    ReplicationPeerConfig config = new ReplicationPeerConfig();
275    config.setClusterKey(KEY_ONE);
276    config.getConfiguration().put("key1", "value1");
277    config.getConfiguration().put("key2", "value2");
278    hbaseAdmin.addReplicationPeer(ID_ONE, config);
279
280    List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
281    assertEquals(1, peers.size());
282    ReplicationPeerDescription peerOne = peers.get(0);
283    assertNotNull(peerOne);
284    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
285    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
286
287    hbaseAdmin.removeReplicationPeer(ID_ONE);
288  }
289
290  @Test
291  public void testAddPeerWithUnDeletedQueues() throws Exception {
292    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
293    rpc1.setClusterKey(KEY_ONE);
294    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
295    rpc2.setClusterKey(KEY_SECOND);
296    Configuration conf = TEST_UTIL.getConfiguration();
297    ReplicationQueueStorage queueStorage =
298      ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf);
299
300    ServerName serverName = ServerName.valueOf("server1", 8000, 1234);
301    // add queue for ID_ONE
302    queueStorage.addWAL(serverName, ID_ONE, "file1");
303    try {
304      admin.addPeer(ID_ONE, rpc1, null);
305      fail();
306    } catch (Exception e) {
307      // OK!
308    }
309    queueStorage.removeQueue(serverName, ID_ONE);
310    assertEquals(0, queueStorage.getAllQueues(serverName).size());
311
312    // add recovered queue for ID_ONE
313    queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1");
314    try {
315      admin.addPeer(ID_ONE, rpc2, null);
316      fail();
317    } catch (Exception e) {
318      // OK!
319    }
320  }
321
322  /**
323   * basic checks that when we add a peer that it is enabled, and that we can disable
324   * @throws Exception
325   */
326  @Test
327  public void testEnableDisable() throws Exception {
328    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
329    rpc1.setClusterKey(KEY_ONE);
330    admin.addPeer(ID_ONE, rpc1, null);
331    assertEquals(1, admin.getPeersCount());
332    assertTrue(admin.getPeerState(ID_ONE));
333    admin.disablePeer(ID_ONE);
334
335    assertFalse(admin.getPeerState(ID_ONE));
336    try {
337      admin.getPeerState(ID_SECOND);
338    } catch (ReplicationPeerNotFoundException e) {
339      // OK!
340    }
341    admin.removePeer(ID_ONE);
342  }
343
344  @Test
345  public void testAppendPeerTableCFs() throws Exception {
346    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
347    rpc.setClusterKey(KEY_ONE);
348    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
349    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
350    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
351    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
352    final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5");
353    final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
354
355    // Add a valid peer
356    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
357
358    // Update peer config, not replicate all user tables
359    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
360    rpc.setReplicateAllUserTables(false);
361    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
362
363    Map<TableName, List<String>> tableCFs = new HashMap<>();
364    tableCFs.put(tableName1, null);
365    admin.appendPeerTableCFs(ID_ONE, tableCFs);
366    Map<TableName, List<String>> result =
367      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
368    assertEquals(1, result.size());
369    assertEquals(true, result.containsKey(tableName1));
370    assertNull(result.get(tableName1));
371
372    // append table t2 to replication
373    tableCFs.clear();
374    tableCFs.put(tableName2, null);
375    admin.appendPeerTableCFs(ID_ONE, tableCFs);
376    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
377    assertEquals(2, result.size());
378    assertTrue("Should contain t1", result.containsKey(tableName1));
379    assertTrue("Should contain t2", result.containsKey(tableName2));
380    assertNull(result.get(tableName1));
381    assertNull(result.get(tableName2));
382
383    // append table column family: f1 of t3 to replication
384    tableCFs.clear();
385    tableCFs.put(tableName3, new ArrayList<>());
386    tableCFs.get(tableName3).add("f1");
387    admin.appendPeerTableCFs(ID_ONE, tableCFs);
388    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
389    assertEquals(3, result.size());
390    assertTrue("Should contain t1", result.containsKey(tableName1));
391    assertTrue("Should contain t2", result.containsKey(tableName2));
392    assertTrue("Should contain t3", result.containsKey(tableName3));
393    assertNull(result.get(tableName1));
394    assertNull(result.get(tableName2));
395    assertEquals(1, result.get(tableName3).size());
396    assertEquals("f1", result.get(tableName3).get(0));
397
398    tableCFs.clear();
399    tableCFs.put(tableName4, new ArrayList<>());
400    tableCFs.get(tableName4).add("f1");
401    tableCFs.get(tableName4).add("f2");
402    admin.appendPeerTableCFs(ID_ONE, tableCFs);
403    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
404    assertEquals(4, result.size());
405    assertTrue("Should contain t1", result.containsKey(tableName1));
406    assertTrue("Should contain t2", result.containsKey(tableName2));
407    assertTrue("Should contain t3", result.containsKey(tableName3));
408    assertTrue("Should contain t4", result.containsKey(tableName4));
409    assertNull(result.get(tableName1));
410    assertNull(result.get(tableName2));
411    assertEquals(1, result.get(tableName3).size());
412    assertEquals("f1", result.get(tableName3).get(0));
413    assertEquals(2, result.get(tableName4).size());
414    assertEquals("f1", result.get(tableName4).get(0));
415    assertEquals("f2", result.get(tableName4).get(1));
416
417    // append "table5" => [], then append "table5" => ["f1"]
418    tableCFs.clear();
419    tableCFs.put(tableName5, new ArrayList<>());
420    admin.appendPeerTableCFs(ID_ONE, tableCFs);
421    tableCFs.clear();
422    tableCFs.put(tableName5, new ArrayList<>());
423    tableCFs.get(tableName5).add("f1");
424    admin.appendPeerTableCFs(ID_ONE, tableCFs);
425    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
426    assertEquals(5, result.size());
427    assertTrue("Should contain t5", result.containsKey(tableName5));
428    // null means replication all cfs of tab5
429    assertNull(result.get(tableName5));
430
431    // append "table6" => ["f1"], then append "table6" => []
432    tableCFs.clear();
433    tableCFs.put(tableName6, new ArrayList<>());
434    tableCFs.get(tableName6).add("f1");
435    admin.appendPeerTableCFs(ID_ONE, tableCFs);
436    tableCFs.clear();
437    tableCFs.put(tableName6, new ArrayList<>());
438    admin.appendPeerTableCFs(ID_ONE, tableCFs);
439    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
440    assertEquals(6, result.size());
441    assertTrue("Should contain t6", result.containsKey(tableName6));
442    // null means replication all cfs of tab6
443    assertNull(result.get(tableName6));
444
445    admin.removePeer(ID_ONE);
446  }
447
448  @Test
449  public void testRemovePeerTableCFs() throws Exception {
450    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
451    rpc.setClusterKey(KEY_ONE);
452    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
453    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
454    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
455    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
456
457    // Add a valid peer
458    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
459
460    // Update peer config, not replicate all user tables
461    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
462    rpc.setReplicateAllUserTables(false);
463    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
464
465    Map<TableName, List<String>> tableCFs = new HashMap<>();
466    try {
467      tableCFs.put(tableName3, null);
468      admin.removePeerTableCFs(ID_ONE, tableCFs);
469      assertTrue(false);
470    } catch (ReplicationException e) {
471    }
472    assertNull(admin.getPeerTableCFs(ID_ONE));
473
474    tableCFs.clear();
475    tableCFs.put(tableName1, null);
476    tableCFs.put(tableName2, new ArrayList<>());
477    tableCFs.get(tableName2).add("cf1");
478    admin.setPeerTableCFs(ID_ONE, tableCFs);
479    try {
480      tableCFs.clear();
481      tableCFs.put(tableName3, null);
482      admin.removePeerTableCFs(ID_ONE, tableCFs);
483      assertTrue(false);
484    } catch (ReplicationException e) {
485    }
486    Map<TableName, List<String>> result =
487      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
488    assertEquals(2, result.size());
489    assertTrue("Should contain t1", result.containsKey(tableName1));
490    assertTrue("Should contain t2", result.containsKey(tableName2));
491    assertNull(result.get(tableName1));
492    assertEquals(1, result.get(tableName2).size());
493    assertEquals("cf1", result.get(tableName2).get(0));
494
495    try {
496      tableCFs.clear();
497      tableCFs.put(tableName1, new ArrayList<>());
498      tableCFs.get(tableName1).add("f1");
499      admin.removePeerTableCFs(ID_ONE, tableCFs);
500      assertTrue(false);
501    } catch (ReplicationException e) {
502    }
503    tableCFs.clear();
504    tableCFs.put(tableName1, null);
505    admin.removePeerTableCFs(ID_ONE, tableCFs);
506    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
507    assertEquals(1, result.size());
508    assertEquals(1, result.get(tableName2).size());
509    assertEquals("cf1", result.get(tableName2).get(0));
510
511    try {
512      tableCFs.clear();
513      tableCFs.put(tableName2, null);
514      admin.removePeerTableCFs(ID_ONE, tableCFs);
515      fail();
516    } catch (ReplicationException e) {
517    }
518    tableCFs.clear();
519    tableCFs.put(tableName2, new ArrayList<>());
520    tableCFs.get(tableName2).add("cf1");
521    admin.removePeerTableCFs(ID_ONE, tableCFs);
522    assertNull(admin.getPeerTableCFs(ID_ONE));
523
524    tableCFs.clear();
525    tableCFs.put(tableName4, new ArrayList<>());
526    admin.setPeerTableCFs(ID_ONE, tableCFs);
527    admin.removePeerTableCFs(ID_ONE, tableCFs);
528    assertNull(admin.getPeerTableCFs(ID_ONE));
529
530    admin.removePeer(ID_ONE);
531  }
532
533  @Test
534  public void testSetPeerNamespaces() throws Exception {
535    String ns1 = "ns1";
536    String ns2 = "ns2";
537
538    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
539    rpc.setClusterKey(KEY_ONE);
540    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
541
542    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
543    rpc.setReplicateAllUserTables(false);
544    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
545
546    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
547    Set<String> namespaces = new HashSet<>();
548    namespaces.add(ns1);
549    namespaces.add(ns2);
550    rpc.setNamespaces(namespaces);
551    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
552    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
553    assertEquals(2, namespaces.size());
554    assertTrue(namespaces.contains(ns1));
555    assertTrue(namespaces.contains(ns2));
556
557    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
558    namespaces = new HashSet<>();
559    namespaces.add(ns1);
560    rpc.setNamespaces(namespaces);
561    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
562    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
563    assertEquals(1, namespaces.size());
564    assertTrue(namespaces.contains(ns1));
565
566    hbaseAdmin.removeReplicationPeer(ID_ONE);
567  }
568
569  @Test
570  public void testSetReplicateAllUserTables() throws Exception {
571    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
572    rpc.setClusterKey(KEY_ONE);
573    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
574
575    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
576    assertTrue(rpc.replicateAllUserTables());
577
578    rpc.setReplicateAllUserTables(false);
579    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
580    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
581    assertFalse(rpc.replicateAllUserTables());
582
583    rpc.setReplicateAllUserTables(true);
584    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
585    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
586    assertTrue(rpc.replicateAllUserTables());
587
588    hbaseAdmin.removeReplicationPeer(ID_ONE);
589  }
590
591  @Test
592  public void testPeerExcludeNamespaces() throws Exception {
593    String ns1 = "ns1";
594    String ns2 = "ns2";
595
596    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
597    rpc.setClusterKey(KEY_ONE);
598    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
599
600    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
601    assertTrue(rpc.replicateAllUserTables());
602
603    Set<String> namespaces = new HashSet<String>();
604    namespaces.add(ns1);
605    namespaces.add(ns2);
606    rpc.setExcludeNamespaces(namespaces);
607    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
608    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
609    assertEquals(2, namespaces.size());
610    assertTrue(namespaces.contains(ns1));
611    assertTrue(namespaces.contains(ns2));
612
613    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
614    namespaces = new HashSet<String>();
615    namespaces.add(ns1);
616    rpc.setExcludeNamespaces(namespaces);
617    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
618    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
619    assertEquals(1, namespaces.size());
620    assertTrue(namespaces.contains(ns1));
621
622    hbaseAdmin.removeReplicationPeer(ID_ONE);
623  }
624
625  @Test
626  public void testPeerExcludeTableCFs() throws Exception {
627    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
628    rpc.setClusterKey(KEY_ONE);
629    TableName tab1 = TableName.valueOf("t1");
630    TableName tab2 = TableName.valueOf("t2");
631    TableName tab3 = TableName.valueOf("t3");
632    TableName tab4 = TableName.valueOf("t4");
633
634    // Add a valid peer
635    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
636    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
637    assertTrue(rpc.replicateAllUserTables());
638
639    Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
640    tableCFs.put(tab1, null);
641    rpc.setExcludeTableCFsMap(tableCFs);
642    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
643    Map<TableName, List<String>> result =
644        hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
645    assertEquals(1, result.size());
646    assertEquals(true, result.containsKey(tab1));
647    assertNull(result.get(tab1));
648
649    tableCFs.put(tab2, new ArrayList<String>());
650    tableCFs.get(tab2).add("f1");
651    rpc.setExcludeTableCFsMap(tableCFs);
652    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
653    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
654    assertEquals(2, result.size());
655    assertTrue("Should contain t1", result.containsKey(tab1));
656    assertTrue("Should contain t2", result.containsKey(tab2));
657    assertNull(result.get(tab1));
658    assertEquals(1, result.get(tab2).size());
659    assertEquals("f1", result.get(tab2).get(0));
660
661    tableCFs.clear();
662    tableCFs.put(tab3, new ArrayList<String>());
663    tableCFs.put(tab4, new ArrayList<String>());
664    tableCFs.get(tab4).add("f1");
665    tableCFs.get(tab4).add("f2");
666    rpc.setExcludeTableCFsMap(tableCFs);
667    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
668    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
669    assertEquals(2, result.size());
670    assertTrue("Should contain t3", result.containsKey(tab3));
671    assertTrue("Should contain t4", result.containsKey(tab4));
672    assertNull(result.get(tab3));
673    assertEquals(2, result.get(tab4).size());
674    assertEquals("f1", result.get(tab4).get(0));
675    assertEquals("f2", result.get(tab4).get(1));
676
677    hbaseAdmin.removeReplicationPeer(ID_ONE);
678  }
679
680  @Test
681  public void testPeerConfigConflict() throws Exception {
682    // Default replicate_all flag is true
683    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
684    rpc.setClusterKey(KEY_ONE);
685
686    String ns1 = "ns1";
687    Set<String> namespaces = new HashSet<String>();
688    namespaces.add(ns1);
689
690    TableName tab1 = TableName.valueOf("ns2:tabl");
691    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
692    tableCfs.put(tab1, new ArrayList<String>());
693
694    try {
695      rpc.setNamespaces(namespaces);
696      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
697      fail("Should throw Exception."
698          + " When replicate all flag is true, no need to config namespaces");
699    } catch (IOException e) {
700      // OK
701      rpc.setNamespaces(null);
702    }
703
704    try {
705      rpc.setTableCFsMap(tableCfs);
706      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
707      fail("Should throw Exception."
708          + " When replicate all flag is true, no need to config table-cfs");
709    } catch (IOException e) {
710      // OK
711      rpc.setTableCFsMap(null);
712    }
713
714    // Set replicate_all flag to true
715    rpc.setReplicateAllUserTables(false);
716    try {
717      rpc.setExcludeNamespaces(namespaces);
718      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
719      fail("Should throw Exception."
720          + " When replicate all flag is false, no need to config exclude namespaces");
721    } catch (IOException e) {
722      // OK
723      rpc.setExcludeNamespaces(null);
724    }
725
726    try {
727      rpc.setExcludeTableCFsMap(tableCfs);
728      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
729      fail("Should throw Exception."
730          + " When replicate all flag is false, no need to config exclude table-cfs");
731    } catch (IOException e) {
732      // OK
733      rpc.setExcludeTableCFsMap(null);
734    }
735
736    rpc.setNamespaces(namespaces);
737    rpc.setTableCFsMap(tableCfs);
738    // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
739    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
740
741    // Default replicate_all flag is true
742    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
743    rpc2.setClusterKey(KEY_SECOND);
744    rpc2.setExcludeNamespaces(namespaces);
745    rpc2.setExcludeTableCFsMap(tableCfs);
746    // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
747    // table-cfs config
748    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
749
750    hbaseAdmin.removeReplicationPeer(ID_ONE);
751    hbaseAdmin.removeReplicationPeer(ID_SECOND);
752  }
753
754  @Test
755  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
756    String ns1 = "ns1";
757    String ns2 = "ns2";
758    final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName());
759    final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2");
760
761    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
762    rpc.setClusterKey(KEY_ONE);
763    rpc.setReplicateAllUserTables(false);
764    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
765
766    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
767    Set<String> namespaces = new HashSet<String>();
768    namespaces.add(ns1);
769    rpc.setNamespaces(namespaces);
770    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
771    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
772    try {
773      Map<TableName, List<String>> tableCfs = new HashMap<>();
774      tableCfs.put(tableName1, new ArrayList<>());
775      rpc.setTableCFsMap(tableCfs);
776      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
777      fail("Should throw ReplicationException" + " Because table " + tableName1
778          + " conflict with namespace " + ns1);
779    } catch (Exception e) {
780      // OK
781    }
782
783    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
784    Map<TableName, List<String>> tableCfs = new HashMap<>();
785    tableCfs.put(tableName2, new ArrayList<>());
786    rpc.setTableCFsMap(tableCfs);
787    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
788    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
789    try {
790      namespaces.clear();
791      namespaces.add(ns2);
792      rpc.setNamespaces(namespaces);
793      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
794      fail("Should throw ReplicationException" + " Because namespace " + ns2
795          + " conflict with table " + tableName2);
796    } catch (Exception e) {
797      // OK
798    }
799
800    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
801    rpc2.setClusterKey(KEY_SECOND);
802    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
803
804    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
805    Set<String> excludeNamespaces = new HashSet<String>();
806    excludeNamespaces.add(ns1);
807    rpc2.setExcludeNamespaces(excludeNamespaces);
808    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
809    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
810    try {
811      Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
812      excludeTableCfs.put(tableName1, new ArrayList<>());
813      rpc2.setExcludeTableCFsMap(excludeTableCfs);
814      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
815      fail("Should throw ReplicationException" + " Because exclude table " + tableName1
816          + " conflict with exclude namespace " + ns1);
817    } catch (Exception e) {
818      // OK
819    }
820
821    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
822    Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
823    excludeTableCfs.put(tableName2, new ArrayList<>());
824    rpc2.setExcludeTableCFsMap(excludeTableCfs);
825    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
826    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
827    try {
828      namespaces.clear();
829      namespaces.add(ns2);
830      rpc2.setNamespaces(namespaces);
831      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
832      fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
833          + " conflict with exclude table " + tableName2);
834    } catch (Exception e) {
835      // OK
836    }
837
838    hbaseAdmin.removeReplicationPeer(ID_ONE);
839    hbaseAdmin.removeReplicationPeer(ID_SECOND);
840  }
841
842  @Test
843  public void testPeerBandwidth() throws Exception {
844    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
845    rpc.setClusterKey(KEY_ONE);
846    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
847
848    rpc = admin.getPeerConfig(ID_ONE);
849    assertEquals(0, rpc.getBandwidth());
850
851    rpc.setBandwidth(2097152);
852    admin.updatePeerConfig(ID_ONE, rpc);
853
854    assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
855    admin.removePeer(ID_ONE);
856  }
857
858  @Test
859  public void testPeerClusterKey() throws Exception {
860    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
861    builder.setClusterKey(KEY_ONE);
862    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
863
864    try {
865      builder.setClusterKey(KEY_SECOND);
866      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
867      fail("Change cluster key on an existing peer is not allowed");
868    } catch (Exception e) {
869      // OK
870    }
871  }
872
873  @Test
874  public void testPeerReplicationEndpointImpl() throws Exception {
875    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
876    builder.setClusterKey(KEY_ONE);
877    builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
878    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
879
880    try {
881      builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName());
882      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
883      fail("Change replication endpoint implementation class on an existing peer is not allowed");
884    } catch (Exception e) {
885      // OK
886    }
887
888    try {
889      builder = ReplicationPeerConfig.newBuilder();
890      builder.setClusterKey(KEY_ONE);
891      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
892      fail("Change replication endpoint implementation class on an existing peer is not allowed");
893    } catch (Exception e) {
894      // OK
895    }
896
897    builder = ReplicationPeerConfig.newBuilder();
898    builder.setClusterKey(KEY_SECOND);
899    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
900
901    try {
902      builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
903      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
904      fail("Change replication endpoint implementation class on an existing peer is not allowed");
905    } catch (Exception e) {
906      // OK
907    }
908  }
909}