001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.regex.Pattern;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.replication.ReplicationException;
037import org.apache.hadoop.hbase.replication.ReplicationFactory;
038import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
039import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
040import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
041import org.apache.hadoop.hbase.replication.ReplicationQueues;
042import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
043import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
044import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest;
045import org.apache.hadoop.hbase.testclassification.ClientTests;
046import org.apache.hadoop.hbase.testclassification.MediumTests;
047import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
048import org.junit.After;
049import org.junit.AfterClass;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Rule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055import org.junit.rules.TestName;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import static org.junit.Assert.assertEquals;
060import static org.junit.Assert.assertFalse;
061import static org.junit.Assert.assertNotNull;
062import static org.junit.Assert.assertNull;
063import static org.junit.Assert.assertTrue;
064import static org.junit.Assert.fail;
065
066/**
067 * Unit testing of ReplicationAdmin
068 */
069@Category({MediumTests.class, ClientTests.class})
070public class TestReplicationAdmin {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestReplicationAdmin.class);
075
076  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class);
077
078  private final static HBaseTestingUtility TEST_UTIL =
079      new HBaseTestingUtility();
080
081  private final String ID_ONE = "1";
082  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
083  private final String ID_SECOND = "2";
084  private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
085
086  private static ReplicationAdmin admin;
087  private static Admin hbaseAdmin;
088
089  @Rule
090  public TestName name = new TestName();
091
092  /**
093   * @throws java.lang.Exception
094   */
095  @BeforeClass
096  public static void setUpBeforeClass() throws Exception {
097    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
098    TEST_UTIL.startMiniCluster();
099    admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
100    hbaseAdmin = TEST_UTIL.getAdmin();
101  }
102
103  @AfterClass
104  public static void tearDownAfterClass() throws Exception {
105    if (admin != null) {
106      admin.close();
107    }
108    TEST_UTIL.shutdownMiniCluster();
109  }
110
111  @After
112  public void cleanupPeer() {
113    try {
114      hbaseAdmin.removeReplicationPeer(ID_ONE);
115    } catch (Exception e) {
116      LOG.debug("Replication peer " + ID_ONE + " may already be removed");
117    }
118    try {
119      hbaseAdmin.removeReplicationPeer(ID_SECOND);
120    } catch (Exception e) {
121      LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
122    }
123  }
124
125  /**
126   * Simple testing of adding and removing peers, basically shows that
127   * all interactions with ZK work
128   * @throws Exception
129   */
130  @Test
131  public void testAddRemovePeer() throws Exception {
132    ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder();
133    rpc1.setClusterKey(KEY_ONE);
134    ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder();
135    rpc2.setClusterKey(KEY_SECOND);
136    // Add a valid peer
137    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
138    // try adding the same (fails)
139    try {
140      hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
141    } catch (Exception e) {
142      // OK!
143    }
144    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
145    // Try to remove an inexisting peer
146    try {
147      hbaseAdmin.removeReplicationPeer(ID_SECOND);
148      fail();
149    } catch (Exception e) {
150      // OK!
151    }
152    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
153    // Add a second since multi-slave is supported
154    try {
155      hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build());
156    } catch (Exception e) {
157      fail();
158    }
159    assertEquals(2, hbaseAdmin.listReplicationPeers().size());
160    // Remove the first peer we added
161    hbaseAdmin.removeReplicationPeer(ID_ONE);
162    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
163    hbaseAdmin.removeReplicationPeer(ID_SECOND);
164    assertEquals(0, hbaseAdmin.listReplicationPeers().size());
165  }
166
167  @Test
168  public void testAddPeerWithState() throws Exception {
169    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
170    rpc1.setClusterKey(KEY_ONE);
171    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
172    assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
173    hbaseAdmin.removeReplicationPeer(ID_ONE);
174
175    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
176    rpc2.setClusterKey(KEY_SECOND);
177    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
178    assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
179    hbaseAdmin.removeReplicationPeer(ID_SECOND);
180  }
181
182  /**
183   * Tests that the peer configuration used by ReplicationAdmin contains all
184   * the peer's properties.
185   */
186  @Test
187  public void testPeerConfig() throws Exception {
188    ReplicationPeerConfig config = new ReplicationPeerConfig();
189    config.setClusterKey(KEY_ONE);
190    config.getConfiguration().put("key1", "value1");
191    config.getConfiguration().put("key2", "value2");
192    hbaseAdmin.addReplicationPeer(ID_ONE, config);
193
194    List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
195    assertEquals(1, peers.size());
196    ReplicationPeerDescription peerOne = peers.get(0);
197    assertNotNull(peerOne);
198    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
199    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
200
201    hbaseAdmin.removeReplicationPeer(ID_ONE);
202  }
203
204  @Test
205  public void testAddPeerWithUnDeletedQueues() throws Exception {
206    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
207    rpc1.setClusterKey(KEY_ONE);
208    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
209    rpc2.setClusterKey(KEY_SECOND);
210    Configuration conf = TEST_UTIL.getConfiguration();
211    ZKWatcher zkw = new ZKWatcher(conf, "Test HBaseAdmin", null);
212    ReplicationQueues repQueues =
213        ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw));
214    repQueues.init("server1");
215
216    // add queue for ID_ONE
217    repQueues.addLog(ID_ONE, "file1");
218    try {
219      admin.addPeer(ID_ONE, rpc1, null);
220      fail();
221    } catch (Exception e) {
222      // OK!
223    }
224    repQueues.removeQueue(ID_ONE);
225    assertEquals(0, repQueues.getAllQueues().size());
226
227    // add recovered queue for ID_ONE
228    repQueues.addLog(ID_ONE + "-server2", "file1");
229    try {
230      admin.addPeer(ID_ONE, rpc2, null);
231      fail();
232    } catch (Exception e) {
233      // OK!
234    }
235    repQueues.removeAllQueues();
236    zkw.close();
237  }
238
239  /**
240   * basic checks that when we add a peer that it is enabled, and that we can disable
241   * @throws Exception
242   */
243  @Test
244  public void testEnableDisable() throws Exception {
245    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
246    rpc1.setClusterKey(KEY_ONE);
247    admin.addPeer(ID_ONE, rpc1, null);
248    assertEquals(1, admin.getPeersCount());
249    assertTrue(admin.getPeerState(ID_ONE));
250    admin.disablePeer(ID_ONE);
251
252    assertFalse(admin.getPeerState(ID_ONE));
253    try {
254      admin.getPeerState(ID_SECOND);
255    } catch (ReplicationPeerNotFoundException e) {
256      // OK!
257    }
258    admin.removePeer(ID_ONE);
259  }
260
261  @Test
262  public void testAppendPeerTableCFs() throws Exception {
263    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
264    rpc.setClusterKey(KEY_ONE);
265    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
266    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
267    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
268    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
269    final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5");
270    final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
271
272    // Add a valid peer
273    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
274
275    // Update peer config, not replicate all user tables
276    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
277    rpc.setReplicateAllUserTables(false);
278    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
279
280    Map<TableName, List<String>> tableCFs = new HashMap<>();
281    tableCFs.put(tableName1, null);
282    admin.appendPeerTableCFs(ID_ONE, tableCFs);
283    Map<TableName, List<String>> result =
284      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
285    assertEquals(1, result.size());
286    assertEquals(true, result.containsKey(tableName1));
287    assertNull(result.get(tableName1));
288
289    // append table t2 to replication
290    tableCFs.clear();
291    tableCFs.put(tableName2, null);
292    admin.appendPeerTableCFs(ID_ONE, tableCFs);
293    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
294    assertEquals(2, result.size());
295    assertTrue("Should contain t1", result.containsKey(tableName1));
296    assertTrue("Should contain t2", result.containsKey(tableName2));
297    assertNull(result.get(tableName1));
298    assertNull(result.get(tableName2));
299
300    // append table column family: f1 of t3 to replication
301    tableCFs.clear();
302    tableCFs.put(tableName3, new ArrayList<>());
303    tableCFs.get(tableName3).add("f1");
304    admin.appendPeerTableCFs(ID_ONE, tableCFs);
305    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
306    assertEquals(3, result.size());
307    assertTrue("Should contain t1", result.containsKey(tableName1));
308    assertTrue("Should contain t2", result.containsKey(tableName2));
309    assertTrue("Should contain t3", result.containsKey(tableName3));
310    assertNull(result.get(tableName1));
311    assertNull(result.get(tableName2));
312    assertEquals(1, result.get(tableName3).size());
313    assertEquals("f1", result.get(tableName3).get(0));
314
315    tableCFs.clear();
316    tableCFs.put(tableName4, new ArrayList<>());
317    tableCFs.get(tableName4).add("f1");
318    tableCFs.get(tableName4).add("f2");
319    admin.appendPeerTableCFs(ID_ONE, tableCFs);
320    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
321    assertEquals(4, result.size());
322    assertTrue("Should contain t1", result.containsKey(tableName1));
323    assertTrue("Should contain t2", result.containsKey(tableName2));
324    assertTrue("Should contain t3", result.containsKey(tableName3));
325    assertTrue("Should contain t4", result.containsKey(tableName4));
326    assertNull(result.get(tableName1));
327    assertNull(result.get(tableName2));
328    assertEquals(1, result.get(tableName3).size());
329    assertEquals("f1", result.get(tableName3).get(0));
330    assertEquals(2, result.get(tableName4).size());
331    assertEquals("f1", result.get(tableName4).get(0));
332    assertEquals("f2", result.get(tableName4).get(1));
333
334    // append "table5" => [], then append "table5" => ["f1"]
335    tableCFs.clear();
336    tableCFs.put(tableName5, new ArrayList<>());
337    admin.appendPeerTableCFs(ID_ONE, tableCFs);
338    tableCFs.clear();
339    tableCFs.put(tableName5, new ArrayList<>());
340    tableCFs.get(tableName5).add("f1");
341    admin.appendPeerTableCFs(ID_ONE, tableCFs);
342    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
343    assertEquals(5, result.size());
344    assertTrue("Should contain t5", result.containsKey(tableName5));
345    // null means replication all cfs of tab5
346    assertNull(result.get(tableName5));
347
348    // append "table6" => ["f1"], then append "table6" => []
349    tableCFs.clear();
350    tableCFs.put(tableName6, new ArrayList<>());
351    tableCFs.get(tableName6).add("f1");
352    admin.appendPeerTableCFs(ID_ONE, tableCFs);
353    tableCFs.clear();
354    tableCFs.put(tableName6, new ArrayList<>());
355    admin.appendPeerTableCFs(ID_ONE, tableCFs);
356    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
357    assertEquals(6, result.size());
358    assertTrue("Should contain t6", result.containsKey(tableName6));
359    // null means replication all cfs of tab6
360    assertNull(result.get(tableName6));
361
362    admin.removePeer(ID_ONE);
363  }
364
365  @Test
366  public void testRemovePeerTableCFs() throws Exception {
367    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
368    rpc.setClusterKey(KEY_ONE);
369    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
370    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
371    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
372    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
373
374    // Add a valid peer
375    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
376
377    // Update peer config, not replicate all user tables
378    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
379    rpc.setReplicateAllUserTables(false);
380    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
381
382    Map<TableName, List<String>> tableCFs = new HashMap<>();
383    try {
384      tableCFs.put(tableName3, null);
385      admin.removePeerTableCFs(ID_ONE, tableCFs);
386      assertTrue(false);
387    } catch (ReplicationException e) {
388    }
389    assertNull(admin.getPeerTableCFs(ID_ONE));
390
391    tableCFs.clear();
392    tableCFs.put(tableName1, null);
393    tableCFs.put(tableName2, new ArrayList<>());
394    tableCFs.get(tableName2).add("cf1");
395    admin.setPeerTableCFs(ID_ONE, tableCFs);
396    try {
397      tableCFs.clear();
398      tableCFs.put(tableName3, null);
399      admin.removePeerTableCFs(ID_ONE, tableCFs);
400      assertTrue(false);
401    } catch (ReplicationException e) {
402    }
403    Map<TableName, List<String>> result =
404      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
405    assertEquals(2, result.size());
406    assertTrue("Should contain t1", result.containsKey(tableName1));
407    assertTrue("Should contain t2", result.containsKey(tableName2));
408    assertNull(result.get(tableName1));
409    assertEquals(1, result.get(tableName2).size());
410    assertEquals("cf1", result.get(tableName2).get(0));
411
412    try {
413      tableCFs.clear();
414      tableCFs.put(tableName1, new ArrayList<>());
415      tableCFs.get(tableName1).add("f1");
416      admin.removePeerTableCFs(ID_ONE, tableCFs);
417      assertTrue(false);
418    } catch (ReplicationException e) {
419    }
420    tableCFs.clear();
421    tableCFs.put(tableName1, null);
422    admin.removePeerTableCFs(ID_ONE, tableCFs);
423    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
424    assertEquals(1, result.size());
425    assertEquals(1, result.get(tableName2).size());
426    assertEquals("cf1", result.get(tableName2).get(0));
427
428    try {
429      tableCFs.clear();
430      tableCFs.put(tableName2, null);
431      admin.removePeerTableCFs(ID_ONE, tableCFs);
432      assertTrue(false);
433    } catch (ReplicationException e) {
434    }
435    tableCFs.clear();
436    tableCFs.put(tableName2, new ArrayList<>());
437    tableCFs.get(tableName2).add("cf1");
438    admin.removePeerTableCFs(ID_ONE, tableCFs);
439    assertNull(admin.getPeerTableCFs(ID_ONE));
440
441    tableCFs.clear();
442    tableCFs.put(tableName4, new ArrayList<>());
443    admin.setPeerTableCFs(ID_ONE, tableCFs);
444    admin.removePeerTableCFs(ID_ONE, tableCFs);
445    assertNull(admin.getPeerTableCFs(ID_ONE));
446
447    admin.removePeer(ID_ONE);
448  }
449
450  @Test
451  public void testSetPeerNamespaces() throws Exception {
452    String ns1 = "ns1";
453    String ns2 = "ns2";
454
455    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
456    rpc.setClusterKey(KEY_ONE);
457    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
458
459    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
460    rpc.setReplicateAllUserTables(false);
461    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
462
463    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
464    Set<String> namespaces = new HashSet<>();
465    namespaces.add(ns1);
466    namespaces.add(ns2);
467    rpc.setNamespaces(namespaces);
468    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
469    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
470    assertEquals(2, namespaces.size());
471    assertTrue(namespaces.contains(ns1));
472    assertTrue(namespaces.contains(ns2));
473
474    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
475    namespaces = new HashSet<>();
476    namespaces.add(ns1);
477    rpc.setNamespaces(namespaces);
478    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
479    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
480    assertEquals(1, namespaces.size());
481    assertTrue(namespaces.contains(ns1));
482
483    hbaseAdmin.removeReplicationPeer(ID_ONE);
484  }
485
486  @Test
487  public void testSetReplicateAllUserTables() throws Exception {
488    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
489    rpc.setClusterKey(KEY_ONE);
490    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
491
492    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
493    assertTrue(rpc.replicateAllUserTables());
494
495    rpc.setReplicateAllUserTables(false);
496    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
497    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
498    assertFalse(rpc.replicateAllUserTables());
499
500    rpc.setReplicateAllUserTables(true);
501    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
502    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
503    assertTrue(rpc.replicateAllUserTables());
504
505    hbaseAdmin.removeReplicationPeer(ID_ONE);
506  }
507
508  @Test
509  public void testPeerExcludeNamespaces() throws Exception {
510    String ns1 = "ns1";
511    String ns2 = "ns2";
512
513    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
514    rpc.setClusterKey(KEY_ONE);
515    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
516
517    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
518    assertTrue(rpc.replicateAllUserTables());
519
520    Set<String> namespaces = new HashSet<String>();
521    namespaces.add(ns1);
522    namespaces.add(ns2);
523    rpc.setExcludeNamespaces(namespaces);
524    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
525    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
526    assertEquals(2, namespaces.size());
527    assertTrue(namespaces.contains(ns1));
528    assertTrue(namespaces.contains(ns2));
529
530    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
531    namespaces = new HashSet<String>();
532    namespaces.add(ns1);
533    rpc.setExcludeNamespaces(namespaces);
534    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
535    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
536    assertEquals(1, namespaces.size());
537    assertTrue(namespaces.contains(ns1));
538
539    hbaseAdmin.removeReplicationPeer(ID_ONE);
540  }
541
542  @Test
543  public void testPeerExcludeTableCFs() throws Exception {
544    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
545    rpc.setClusterKey(KEY_ONE);
546    TableName tab1 = TableName.valueOf("t1");
547    TableName tab2 = TableName.valueOf("t2");
548    TableName tab3 = TableName.valueOf("t3");
549    TableName tab4 = TableName.valueOf("t4");
550
551    // Add a valid peer
552    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
553    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
554    assertTrue(rpc.replicateAllUserTables());
555
556    Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
557    tableCFs.put(tab1, null);
558    rpc.setExcludeTableCFsMap(tableCFs);
559    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
560    Map<TableName, List<String>> result =
561        hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
562    assertEquals(1, result.size());
563    assertEquals(true, result.containsKey(tab1));
564    assertNull(result.get(tab1));
565
566    tableCFs.put(tab2, new ArrayList<String>());
567    tableCFs.get(tab2).add("f1");
568    rpc.setExcludeTableCFsMap(tableCFs);
569    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
570    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
571    assertEquals(2, result.size());
572    assertTrue("Should contain t1", result.containsKey(tab1));
573    assertTrue("Should contain t2", result.containsKey(tab2));
574    assertNull(result.get(tab1));
575    assertEquals(1, result.get(tab2).size());
576    assertEquals("f1", result.get(tab2).get(0));
577
578    tableCFs.clear();
579    tableCFs.put(tab3, new ArrayList<String>());
580    tableCFs.put(tab4, new ArrayList<String>());
581    tableCFs.get(tab4).add("f1");
582    tableCFs.get(tab4).add("f2");
583    rpc.setExcludeTableCFsMap(tableCFs);
584    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
585    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
586    assertEquals(2, result.size());
587    assertTrue("Should contain t3", result.containsKey(tab3));
588    assertTrue("Should contain t4", result.containsKey(tab4));
589    assertNull(result.get(tab3));
590    assertEquals(2, result.get(tab4).size());
591    assertEquals("f1", result.get(tab4).get(0));
592    assertEquals("f2", result.get(tab4).get(1));
593
594    hbaseAdmin.removeReplicationPeer(ID_ONE);
595  }
596
597  @Test
598  public void testPeerConfigConflict() throws Exception {
599    // Default replicate_all flag is true
600    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
601    rpc.setClusterKey(KEY_ONE);
602
603    String ns1 = "ns1";
604    Set<String> namespaces = new HashSet<String>();
605    namespaces.add(ns1);
606
607    TableName tab1 = TableName.valueOf("ns2:tabl");
608    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
609    tableCfs.put(tab1, new ArrayList<String>());
610
611    try {
612      rpc.setNamespaces(namespaces);
613      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
614      fail("Should throw Exception."
615          + " When replicate all flag is true, no need to config namespaces");
616    } catch (IOException e) {
617      // OK
618      rpc.setNamespaces(null);
619    }
620
621    try {
622      rpc.setTableCFsMap(tableCfs);
623      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
624      fail("Should throw Exception."
625          + " When replicate all flag is true, no need to config table-cfs");
626    } catch (IOException e) {
627      // OK
628      rpc.setTableCFsMap(null);
629    }
630
631    // Set replicate_all flag to true
632    rpc.setReplicateAllUserTables(false);
633    try {
634      rpc.setExcludeNamespaces(namespaces);
635      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
636      fail("Should throw Exception."
637          + " When replicate all flag is false, no need to config exclude namespaces");
638    } catch (IOException e) {
639      // OK
640      rpc.setExcludeNamespaces(null);
641    }
642
643    try {
644      rpc.setExcludeTableCFsMap(tableCfs);
645      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
646      fail("Should throw Exception."
647          + " When replicate all flag is false, no need to config exclude table-cfs");
648    } catch (IOException e) {
649      // OK
650      rpc.setExcludeTableCFsMap(null);
651    }
652
653    rpc.setNamespaces(namespaces);
654    rpc.setTableCFsMap(tableCfs);
655    // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
656    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
657
658    // Default replicate_all flag is true
659    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
660    rpc2.setClusterKey(KEY_SECOND);
661    rpc2.setExcludeNamespaces(namespaces);
662    rpc2.setExcludeTableCFsMap(tableCfs);
663    // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
664    // table-cfs config
665    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
666
667    hbaseAdmin.removeReplicationPeer(ID_ONE);
668    hbaseAdmin.removeReplicationPeer(ID_SECOND);
669  }
670
671  @Test
672  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
673    String ns1 = "ns1";
674    String ns2 = "ns2";
675    final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName());
676    final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2");
677
678    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
679    rpc.setClusterKey(KEY_ONE);
680    rpc.setReplicateAllUserTables(false);
681    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
682
683    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
684    Set<String> namespaces = new HashSet<String>();
685    namespaces.add(ns1);
686    rpc.setNamespaces(namespaces);
687    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
688    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
689    try {
690      Map<TableName, List<String>> tableCfs = new HashMap<>();
691      tableCfs.put(tableName1, new ArrayList<>());
692      rpc.setTableCFsMap(tableCfs);
693      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
694      fail("Should throw ReplicationException" + " Because table " + tableName1
695          + " conflict with namespace " + ns1);
696    } catch (Exception e) {
697      // OK
698    }
699
700    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
701    Map<TableName, List<String>> tableCfs = new HashMap<>();
702    tableCfs.put(tableName2, new ArrayList<>());
703    rpc.setTableCFsMap(tableCfs);
704    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
705    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
706    try {
707      namespaces.clear();
708      namespaces.add(ns2);
709      rpc.setNamespaces(namespaces);
710      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
711      fail("Should throw ReplicationException" + " Because namespace " + ns2
712          + " conflict with table " + tableName2);
713    } catch (Exception e) {
714      // OK
715    }
716
717    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
718    rpc2.setClusterKey(KEY_SECOND);
719    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
720
721    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
722    Set<String> excludeNamespaces = new HashSet<String>();
723    excludeNamespaces.add(ns1);
724    rpc2.setExcludeNamespaces(excludeNamespaces);
725    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
726    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
727    try {
728      Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
729      excludeTableCfs.put(tableName1, new ArrayList<>());
730      rpc2.setExcludeTableCFsMap(excludeTableCfs);
731      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
732      fail("Should throw ReplicationException" + " Because exclude table " + tableName1
733          + " conflict with exclude namespace " + ns1);
734    } catch (Exception e) {
735      // OK
736    }
737
738    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
739    Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
740    excludeTableCfs.put(tableName2, new ArrayList<>());
741    rpc2.setExcludeTableCFsMap(excludeTableCfs);
742    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
743    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
744    try {
745      namespaces.clear();
746      namespaces.add(ns2);
747      rpc2.setNamespaces(namespaces);
748      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
749      fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
750          + " conflict with exclude table " + tableName2);
751    } catch (Exception e) {
752      // OK
753    }
754
755    hbaseAdmin.removeReplicationPeer(ID_ONE);
756    hbaseAdmin.removeReplicationPeer(ID_SECOND);
757  }
758
759  @Test
760  public void testPeerBandwidth() throws Exception {
761    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
762    rpc.setClusterKey(KEY_ONE);
763    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
764
765    rpc = admin.getPeerConfig(ID_ONE);
766    assertEquals(0, rpc.getBandwidth());
767
768    rpc.setBandwidth(2097152);
769    admin.updatePeerConfig(ID_ONE, rpc);
770
771    assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
772    admin.removePeer(ID_ONE);
773  }
774
775  @Test
776  public void testPeerClusterKey() throws Exception {
777    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
778    builder.setClusterKey(KEY_ONE);
779    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
780
781    try {
782      builder.setClusterKey(KEY_SECOND);
783      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
784      fail("Change cluster key on an existing peer is not allowed");
785    } catch (Exception e) {
786      // OK
787    }
788  }
789
790  @Test
791  public void testPeerReplicationEndpointImpl() throws Exception {
792    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
793    builder.setClusterKey(KEY_ONE);
794    builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
795    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
796
797    try {
798      builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName());
799      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
800      fail("Change replication endpoint implementation class on an existing peer is not allowed");
801    } catch (Exception e) {
802      // OK
803    }
804
805    try {
806      builder = ReplicationPeerConfig.newBuilder();
807      builder.setClusterKey(KEY_ONE);
808      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
809      fail("Change replication endpoint implementation class on an existing peer is not allowed");
810    } catch (Exception e) {
811      // OK
812    }
813
814    builder = ReplicationPeerConfig.newBuilder();
815    builder.setClusterKey(KEY_SECOND);
816    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
817
818    try {
819      builder.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
820      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
821      fail("Change replication endpoint implementation class on an existing peer is not allowed");
822    } catch (Exception e) {
823      // OK
824    }
825  }
826}