001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.atomic.AtomicLong;
036import java.util.regex.Pattern;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseTestingUtility;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
047import org.apache.hadoop.hbase.replication.ReplicationException;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
049import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
050import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
053import org.apache.hadoop.hbase.replication.ReplicationUtils;
054import org.apache.hadoop.hbase.replication.SyncReplicationState;
055import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest;
056import org.apache.hadoop.hbase.testclassification.ClientTests;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.junit.After;
060import org.junit.AfterClass;
061import org.junit.BeforeClass;
062import org.junit.ClassRule;
063import org.junit.Rule;
064import org.junit.Test;
065import org.junit.experimental.categories.Category;
066import org.junit.rules.TestName;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070/**
071 * Unit testing of ReplicationAdmin
072 */
073@Category({MediumTests.class, ClientTests.class})
074public class TestReplicationAdmin {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078      HBaseClassTestRule.forClass(TestReplicationAdmin.class);
079
080  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class);
081
082  private final static HBaseTestingUtility TEST_UTIL =
083      new HBaseTestingUtility();
084
085  private final String ID_ONE = "1";
086  private final String KEY_ONE = "127.0.0.1:2181:/hbase";
087  private final String ID_SECOND = "2";
088  private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
089
090  private static ReplicationAdmin admin;
091  private static Admin hbaseAdmin;
092
093  @Rule
094  public TestName name = new TestName();
095
096  /**
097   * @throws java.lang.Exception
098   */
099  @BeforeClass
100  public static void setUpBeforeClass() throws Exception {
101    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
102    TEST_UTIL.startMiniCluster();
103    admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
104    hbaseAdmin = TEST_UTIL.getAdmin();
105  }
106
107  @AfterClass
108  public static void tearDownAfterClass() throws Exception {
109    if (admin != null) {
110      admin.close();
111    }
112    TEST_UTIL.shutdownMiniCluster();
113  }
114
115  @After
116  public void tearDown() throws Exception {
117    for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) {
118      hbaseAdmin.removeReplicationPeer(desc.getPeerId());
119    }
120    ReplicationQueueStorage queueStorage = ReplicationStorageFactory
121        .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
122    for (ServerName serverName : queueStorage.getListOfReplicators()) {
123      for (String queue : queueStorage.getAllQueues(serverName)) {
124        queueStorage.removeQueue(serverName, queue);
125      }
126      queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
127    }
128  }
129
130  @Test
131  public void testConcurrentPeerOperations() throws Exception {
132    int threadNum = 5;
133    AtomicLong successCount = new AtomicLong(0);
134
135    // Test concurrent add peer operation
136    Thread[] addPeers = new Thread[threadNum];
137    for (int i = 0; i < threadNum; i++) {
138      addPeers[i] = new Thread(() -> {
139        try {
140          hbaseAdmin.addReplicationPeer(ID_ONE,
141            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
142          successCount.incrementAndGet();
143        } catch (Exception e) {
144          LOG.debug("Got exception when add replication peer", e);
145        }
146      });
147      addPeers[i].start();
148    }
149    for (Thread addPeer : addPeers) {
150      addPeer.join();
151    }
152    assertEquals(1, successCount.get());
153
154    // Test concurrent remove peer operation
155    successCount.set(0);
156    Thread[] removePeers = new Thread[threadNum];
157    for (int i = 0; i < threadNum; i++) {
158      removePeers[i] = new Thread(() -> {
159        try {
160          hbaseAdmin.removeReplicationPeer(ID_ONE);
161          successCount.incrementAndGet();
162        } catch (Exception e) {
163          LOG.debug("Got exception when remove replication peer", e);
164        }
165      });
166      removePeers[i].start();
167    }
168    for (Thread removePeer : removePeers) {
169      removePeer.join();
170    }
171    assertEquals(1, successCount.get());
172
173    // Test concurrent add peer operation again
174    successCount.set(0);
175    addPeers = new Thread[threadNum];
176    for (int i = 0; i < threadNum; i++) {
177      addPeers[i] = new Thread(() -> {
178        try {
179          hbaseAdmin.addReplicationPeer(ID_ONE,
180            ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build());
181          successCount.incrementAndGet();
182        } catch (Exception e) {
183          LOG.debug("Got exception when add replication peer", e);
184        }
185      });
186      addPeers[i].start();
187    }
188    for (Thread addPeer : addPeers) {
189      addPeer.join();
190    }
191    assertEquals(1, successCount.get());
192  }
193
194  @Test
195  public void testAddInvalidPeer() {
196    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
197    builder.setClusterKey(KEY_ONE);
198    try {
199      String invalidPeerId = "1-2";
200      hbaseAdmin.addReplicationPeer(invalidPeerId, builder.build());
201      fail("Should fail as the peer id: " + invalidPeerId + " is invalid");
202    } catch (Exception e) {
203      // OK
204    }
205
206    try {
207      String invalidClusterKey = "2181:/hbase";
208      builder.setClusterKey(invalidClusterKey);
209      hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
210      fail("Should fail as the peer cluster key: " + invalidClusterKey + " is invalid");
211    } catch (Exception e) {
212      // OK
213    }
214  }
215
216  /**
217   * Simple testing of adding and removing peers, basically shows that
218   * all interactions with ZK work
219   * @throws Exception
220   */
221  @Test
222  public void testAddRemovePeer() throws Exception {
223    ReplicationPeerConfigBuilder rpc1 = ReplicationPeerConfig.newBuilder();
224    rpc1.setClusterKey(KEY_ONE);
225    ReplicationPeerConfigBuilder rpc2 = ReplicationPeerConfig.newBuilder();
226    rpc2.setClusterKey(KEY_SECOND);
227    // Add a valid peer
228    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
229    // try adding the same (fails)
230    try {
231      hbaseAdmin.addReplicationPeer(ID_ONE, rpc1.build());
232    } catch (Exception e) {
233      // OK!
234    }
235    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
236    // Try to remove an inexisting peer
237    try {
238      hbaseAdmin.removeReplicationPeer(ID_SECOND);
239      fail();
240    } catch (Exception e) {
241      // OK!
242    }
243    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
244    // Add a second since multi-slave is supported
245    try {
246      hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2.build());
247    } catch (Exception e) {
248      fail();
249    }
250    assertEquals(2, hbaseAdmin.listReplicationPeers().size());
251    // Remove the first peer we added
252    hbaseAdmin.removeReplicationPeer(ID_ONE);
253    assertEquals(1, hbaseAdmin.listReplicationPeers().size());
254    hbaseAdmin.removeReplicationPeer(ID_SECOND);
255    assertEquals(0, hbaseAdmin.listReplicationPeers().size());
256  }
257
258  @Test
259  public void testRemovePeerWithNonDAState() throws Exception {
260    TableName tableName = TableName.valueOf(name.getMethodName());
261    TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
262    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
263
264    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
265    TEST_UTIL.getTestFileSystem().mkdirs(new Path(rootDir, ID_ONE));
266    builder.setClusterKey(KEY_ONE);
267    builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
268      TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
269    builder.setReplicateAllUserTables(false);
270    Map<TableName, List<String>> tableCfs = new HashMap<>();
271    tableCfs.put(tableName, new ArrayList<>());
272    builder.setTableCFsMap(tableCfs);
273    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
274    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
275      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
276
277    // Transit sync replication state to ACTIVE.
278    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, SyncReplicationState.ACTIVE);
279    assertEquals(SyncReplicationState.ACTIVE,
280      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
281
282    try {
283      hbaseAdmin.removeReplicationPeer(ID_ONE);
284      fail("Can't remove a synchronous replication peer with state=ACTIVE");
285    } catch (IOException e) {
286      // OK
287    }
288
289    // Transit sync replication state to DA
290    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
291      SyncReplicationState.DOWNGRADE_ACTIVE);
292    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
293      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
294    // Transit sync replication state to STANDBY
295    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE, SyncReplicationState.STANDBY);
296    assertEquals(SyncReplicationState.STANDBY,
297      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
298
299    try {
300      hbaseAdmin.removeReplicationPeer(ID_ONE);
301      fail("Can't remove a synchronous replication peer with state=STANDBY");
302    } catch (IOException e) {
303      // OK
304    }
305
306    // Transit sync replication state to DA
307    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
308      SyncReplicationState.DOWNGRADE_ACTIVE);
309    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
310      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
311
312    hbaseAdmin.removeReplicationPeer(ID_ONE);
313    assertEquals(0, hbaseAdmin.listReplicationPeers().size());
314  }
315
316  @Test
317  public void testAddPeerWithState() throws Exception {
318    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
319    rpc1.setClusterKey(KEY_ONE);
320    hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
321    assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
322    hbaseAdmin.removeReplicationPeer(ID_ONE);
323
324    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
325    rpc2.setClusterKey(KEY_SECOND);
326    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
327    assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
328    hbaseAdmin.removeReplicationPeer(ID_SECOND);
329  }
330
331  /**
332   * Tests that the peer configuration used by ReplicationAdmin contains all
333   * the peer's properties.
334   */
335  @Test
336  public void testPeerConfig() throws Exception {
337    ReplicationPeerConfig config = new ReplicationPeerConfig();
338    config.setClusterKey(KEY_ONE);
339    config.getConfiguration().put("key1", "value1");
340    config.getConfiguration().put("key2", "value2");
341    hbaseAdmin.addReplicationPeer(ID_ONE, config);
342
343    List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
344    assertEquals(1, peers.size());
345    ReplicationPeerDescription peerOne = peers.get(0);
346    assertNotNull(peerOne);
347    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
348    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
349
350    hbaseAdmin.removeReplicationPeer(ID_ONE);
351  }
352
353  @Test
354  public void testAddPeerWithUnDeletedQueues() throws Exception {
355    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
356    rpc1.setClusterKey(KEY_ONE);
357    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
358    rpc2.setClusterKey(KEY_SECOND);
359    Configuration conf = TEST_UTIL.getConfiguration();
360    ReplicationQueueStorage queueStorage =
361      ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf);
362
363    ServerName serverName = ServerName.valueOf("server1", 8000, 1234);
364    // add queue for ID_ONE
365    queueStorage.addWAL(serverName, ID_ONE, "file1");
366    try {
367      admin.addPeer(ID_ONE, rpc1, null);
368      fail();
369    } catch (Exception e) {
370      // OK!
371    }
372    queueStorage.removeQueue(serverName, ID_ONE);
373    assertEquals(0, queueStorage.getAllQueues(serverName).size());
374
375    // add recovered queue for ID_ONE
376    queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1");
377    try {
378      admin.addPeer(ID_ONE, rpc2, null);
379      fail();
380    } catch (Exception e) {
381      // OK!
382    }
383  }
384
385  /**
386   * basic checks that when we add a peer that it is enabled, and that we can disable
387   * @throws Exception
388   */
389  @Test
390  public void testEnableDisable() throws Exception {
391    ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
392    rpc1.setClusterKey(KEY_ONE);
393    admin.addPeer(ID_ONE, rpc1, null);
394    assertEquals(1, admin.getPeersCount());
395    assertTrue(admin.getPeerState(ID_ONE));
396    admin.disablePeer(ID_ONE);
397
398    assertFalse(admin.getPeerState(ID_ONE));
399    try {
400      admin.getPeerState(ID_SECOND);
401    } catch (ReplicationPeerNotFoundException e) {
402      // OK!
403    }
404    admin.removePeer(ID_ONE);
405  }
406
407  @Test
408  public void testAppendPeerTableCFs() throws Exception {
409    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
410    rpc.setClusterKey(KEY_ONE);
411    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
412    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
413    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
414    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
415    final TableName tableName5 = TableName.valueOf(name.getMethodName() + "t5");
416    final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
417
418    // Add a valid peer
419    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
420
421    // Update peer config, not replicate all user tables
422    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
423    rpc.setReplicateAllUserTables(false);
424    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
425
426    Map<TableName, List<String>> tableCFs = new HashMap<>();
427    tableCFs.put(tableName1, null);
428    admin.appendPeerTableCFs(ID_ONE, tableCFs);
429    Map<TableName, List<String>> result =
430      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
431    assertEquals(1, result.size());
432    assertEquals(true, result.containsKey(tableName1));
433    assertNull(result.get(tableName1));
434
435    // append table t2 to replication
436    tableCFs.clear();
437    tableCFs.put(tableName2, null);
438    admin.appendPeerTableCFs(ID_ONE, tableCFs);
439    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
440    assertEquals(2, result.size());
441    assertTrue("Should contain t1", result.containsKey(tableName1));
442    assertTrue("Should contain t2", result.containsKey(tableName2));
443    assertNull(result.get(tableName1));
444    assertNull(result.get(tableName2));
445
446    // append table column family: f1 of t3 to replication
447    tableCFs.clear();
448    tableCFs.put(tableName3, new ArrayList<>());
449    tableCFs.get(tableName3).add("f1");
450    admin.appendPeerTableCFs(ID_ONE, tableCFs);
451    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
452    assertEquals(3, result.size());
453    assertTrue("Should contain t1", result.containsKey(tableName1));
454    assertTrue("Should contain t2", result.containsKey(tableName2));
455    assertTrue("Should contain t3", result.containsKey(tableName3));
456    assertNull(result.get(tableName1));
457    assertNull(result.get(tableName2));
458    assertEquals(1, result.get(tableName3).size());
459    assertEquals("f1", result.get(tableName3).get(0));
460
461    tableCFs.clear();
462    tableCFs.put(tableName4, new ArrayList<>());
463    tableCFs.get(tableName4).add("f1");
464    tableCFs.get(tableName4).add("f2");
465    admin.appendPeerTableCFs(ID_ONE, tableCFs);
466    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
467    assertEquals(4, result.size());
468    assertTrue("Should contain t1", result.containsKey(tableName1));
469    assertTrue("Should contain t2", result.containsKey(tableName2));
470    assertTrue("Should contain t3", result.containsKey(tableName3));
471    assertTrue("Should contain t4", result.containsKey(tableName4));
472    assertNull(result.get(tableName1));
473    assertNull(result.get(tableName2));
474    assertEquals(1, result.get(tableName3).size());
475    assertEquals("f1", result.get(tableName3).get(0));
476    assertEquals(2, result.get(tableName4).size());
477    assertEquals("f1", result.get(tableName4).get(0));
478    assertEquals("f2", result.get(tableName4).get(1));
479
480    // append "table5" => [], then append "table5" => ["f1"]
481    tableCFs.clear();
482    tableCFs.put(tableName5, new ArrayList<>());
483    admin.appendPeerTableCFs(ID_ONE, tableCFs);
484    tableCFs.clear();
485    tableCFs.put(tableName5, new ArrayList<>());
486    tableCFs.get(tableName5).add("f1");
487    admin.appendPeerTableCFs(ID_ONE, tableCFs);
488    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
489    assertEquals(5, result.size());
490    assertTrue("Should contain t5", result.containsKey(tableName5));
491    // null means replication all cfs of tab5
492    assertNull(result.get(tableName5));
493
494    // append "table6" => ["f1"], then append "table6" => []
495    tableCFs.clear();
496    tableCFs.put(tableName6, new ArrayList<>());
497    tableCFs.get(tableName6).add("f1");
498    admin.appendPeerTableCFs(ID_ONE, tableCFs);
499    tableCFs.clear();
500    tableCFs.put(tableName6, new ArrayList<>());
501    admin.appendPeerTableCFs(ID_ONE, tableCFs);
502    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
503    assertEquals(6, result.size());
504    assertTrue("Should contain t6", result.containsKey(tableName6));
505    // null means replication all cfs of tab6
506    assertNull(result.get(tableName6));
507
508    admin.removePeer(ID_ONE);
509  }
510
511  @Test
512  public void testRemovePeerTableCFs() throws Exception {
513    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
514    rpc.setClusterKey(KEY_ONE);
515    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
516    final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
517    final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
518    final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
519
520    // Add a valid peer
521    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
522
523    // Update peer config, not replicate all user tables
524    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
525    rpc.setReplicateAllUserTables(false);
526    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
527
528    Map<TableName, List<String>> tableCFs = new HashMap<>();
529    try {
530      tableCFs.put(tableName3, null);
531      admin.removePeerTableCFs(ID_ONE, tableCFs);
532      assertTrue(false);
533    } catch (ReplicationException e) {
534    }
535    assertNull(admin.getPeerTableCFs(ID_ONE));
536
537    tableCFs.clear();
538    tableCFs.put(tableName1, null);
539    tableCFs.put(tableName2, new ArrayList<>());
540    tableCFs.get(tableName2).add("cf1");
541    admin.setPeerTableCFs(ID_ONE, tableCFs);
542    try {
543      tableCFs.clear();
544      tableCFs.put(tableName3, null);
545      admin.removePeerTableCFs(ID_ONE, tableCFs);
546      assertTrue(false);
547    } catch (ReplicationException e) {
548    }
549    Map<TableName, List<String>> result =
550      ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
551    assertEquals(2, result.size());
552    assertTrue("Should contain t1", result.containsKey(tableName1));
553    assertTrue("Should contain t2", result.containsKey(tableName2));
554    assertNull(result.get(tableName1));
555    assertEquals(1, result.get(tableName2).size());
556    assertEquals("cf1", result.get(tableName2).get(0));
557
558    try {
559      tableCFs.clear();
560      tableCFs.put(tableName1, new ArrayList<>());
561      tableCFs.get(tableName1).add("f1");
562      admin.removePeerTableCFs(ID_ONE, tableCFs);
563      assertTrue(false);
564    } catch (ReplicationException e) {
565    }
566    tableCFs.clear();
567    tableCFs.put(tableName1, null);
568    admin.removePeerTableCFs(ID_ONE, tableCFs);
569    result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
570    assertEquals(1, result.size());
571    assertEquals(1, result.get(tableName2).size());
572    assertEquals("cf1", result.get(tableName2).get(0));
573
574    try {
575      tableCFs.clear();
576      tableCFs.put(tableName2, null);
577      admin.removePeerTableCFs(ID_ONE, tableCFs);
578      fail();
579    } catch (ReplicationException e) {
580    }
581    tableCFs.clear();
582    tableCFs.put(tableName2, new ArrayList<>());
583    tableCFs.get(tableName2).add("cf1");
584    admin.removePeerTableCFs(ID_ONE, tableCFs);
585    assertNull(admin.getPeerTableCFs(ID_ONE));
586
587    tableCFs.clear();
588    tableCFs.put(tableName4, new ArrayList<>());
589    admin.setPeerTableCFs(ID_ONE, tableCFs);
590    admin.removePeerTableCFs(ID_ONE, tableCFs);
591    assertNull(admin.getPeerTableCFs(ID_ONE));
592
593    admin.removePeer(ID_ONE);
594  }
595
596  @Test
597  public void testSetPeerNamespaces() throws Exception {
598    String ns1 = "ns1";
599    String ns2 = "ns2";
600
601    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
602    rpc.setClusterKey(KEY_ONE);
603    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
604
605    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
606    rpc.setReplicateAllUserTables(false);
607    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
608
609    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
610    Set<String> namespaces = new HashSet<>();
611    namespaces.add(ns1);
612    namespaces.add(ns2);
613    rpc.setNamespaces(namespaces);
614    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
615    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
616    assertEquals(2, namespaces.size());
617    assertTrue(namespaces.contains(ns1));
618    assertTrue(namespaces.contains(ns2));
619
620    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
621    namespaces = new HashSet<>();
622    namespaces.add(ns1);
623    rpc.setNamespaces(namespaces);
624    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
625    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
626    assertEquals(1, namespaces.size());
627    assertTrue(namespaces.contains(ns1));
628
629    hbaseAdmin.removeReplicationPeer(ID_ONE);
630  }
631
632  @Test
633  public void testSetReplicateAllUserTables() throws Exception {
634    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
635    rpc.setClusterKey(KEY_ONE);
636    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
637
638    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
639    assertTrue(rpc.replicateAllUserTables());
640
641    rpc.setReplicateAllUserTables(false);
642    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
643    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
644    assertFalse(rpc.replicateAllUserTables());
645
646    rpc.setReplicateAllUserTables(true);
647    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
648    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
649    assertTrue(rpc.replicateAllUserTables());
650
651    hbaseAdmin.removeReplicationPeer(ID_ONE);
652  }
653
654  @Test
655  public void testPeerExcludeNamespaces() throws Exception {
656    String ns1 = "ns1";
657    String ns2 = "ns2";
658
659    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
660    rpc.setClusterKey(KEY_ONE);
661    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
662
663    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
664    assertTrue(rpc.replicateAllUserTables());
665
666    Set<String> namespaces = new HashSet<String>();
667    namespaces.add(ns1);
668    namespaces.add(ns2);
669    rpc.setExcludeNamespaces(namespaces);
670    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
671    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
672    assertEquals(2, namespaces.size());
673    assertTrue(namespaces.contains(ns1));
674    assertTrue(namespaces.contains(ns2));
675
676    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
677    namespaces = new HashSet<String>();
678    namespaces.add(ns1);
679    rpc.setExcludeNamespaces(namespaces);
680    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
681    namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeNamespaces();
682    assertEquals(1, namespaces.size());
683    assertTrue(namespaces.contains(ns1));
684
685    hbaseAdmin.removeReplicationPeer(ID_ONE);
686  }
687
688  @Test
689  public void testPeerExcludeTableCFs() throws Exception {
690    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
691    rpc.setClusterKey(KEY_ONE);
692    TableName tab1 = TableName.valueOf("t1");
693    TableName tab2 = TableName.valueOf("t2");
694    TableName tab3 = TableName.valueOf("t3");
695    TableName tab4 = TableName.valueOf("t4");
696
697    // Add a valid peer
698    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
699    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
700    assertTrue(rpc.replicateAllUserTables());
701
702    Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
703    tableCFs.put(tab1, null);
704    rpc.setExcludeTableCFsMap(tableCFs);
705    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
706    Map<TableName, List<String>> result =
707        hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
708    assertEquals(1, result.size());
709    assertEquals(true, result.containsKey(tab1));
710    assertNull(result.get(tab1));
711
712    tableCFs.put(tab2, new ArrayList<String>());
713    tableCFs.get(tab2).add("f1");
714    rpc.setExcludeTableCFsMap(tableCFs);
715    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
716    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
717    assertEquals(2, result.size());
718    assertTrue("Should contain t1", result.containsKey(tab1));
719    assertTrue("Should contain t2", result.containsKey(tab2));
720    assertNull(result.get(tab1));
721    assertEquals(1, result.get(tab2).size());
722    assertEquals("f1", result.get(tab2).get(0));
723
724    tableCFs.clear();
725    tableCFs.put(tab3, new ArrayList<String>());
726    tableCFs.put(tab4, new ArrayList<String>());
727    tableCFs.get(tab4).add("f1");
728    tableCFs.get(tab4).add("f2");
729    rpc.setExcludeTableCFsMap(tableCFs);
730    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
731    result = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getExcludeTableCFsMap();
732    assertEquals(2, result.size());
733    assertTrue("Should contain t3", result.containsKey(tab3));
734    assertTrue("Should contain t4", result.containsKey(tab4));
735    assertNull(result.get(tab3));
736    assertEquals(2, result.get(tab4).size());
737    assertEquals("f1", result.get(tab4).get(0));
738    assertEquals("f2", result.get(tab4).get(1));
739
740    hbaseAdmin.removeReplicationPeer(ID_ONE);
741  }
742
743  @Test
744  public void testPeerConfigConflict() throws Exception {
745    // Default replicate_all flag is true
746    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
747    rpc.setClusterKey(KEY_ONE);
748
749    String ns1 = "ns1";
750    Set<String> namespaces = new HashSet<String>();
751    namespaces.add(ns1);
752
753    TableName tab1 = TableName.valueOf("ns2:tabl");
754    Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
755    tableCfs.put(tab1, new ArrayList<String>());
756
757    try {
758      rpc.setNamespaces(namespaces);
759      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
760      fail("Should throw Exception."
761          + " When replicate all flag is true, no need to config namespaces");
762    } catch (IOException e) {
763      // OK
764      rpc.setNamespaces(null);
765    }
766
767    try {
768      rpc.setTableCFsMap(tableCfs);
769      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
770      fail("Should throw Exception."
771          + " When replicate all flag is true, no need to config table-cfs");
772    } catch (IOException e) {
773      // OK
774      rpc.setTableCFsMap(null);
775    }
776
777    // Set replicate_all flag to true
778    rpc.setReplicateAllUserTables(false);
779    try {
780      rpc.setExcludeNamespaces(namespaces);
781      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
782      fail("Should throw Exception."
783          + " When replicate all flag is false, no need to config exclude namespaces");
784    } catch (IOException e) {
785      // OK
786      rpc.setExcludeNamespaces(null);
787    }
788
789    try {
790      rpc.setExcludeTableCFsMap(tableCfs);
791      hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
792      fail("Should throw Exception."
793          + " When replicate all flag is false, no need to config exclude table-cfs");
794    } catch (IOException e) {
795      // OK
796      rpc.setExcludeTableCFsMap(null);
797    }
798
799    rpc.setNamespaces(namespaces);
800    rpc.setTableCFsMap(tableCfs);
801    // OK to add a new peer which replicate_all flag is false and with namespaces, table-cfs config
802    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
803
804    // Default replicate_all flag is true
805    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
806    rpc2.setClusterKey(KEY_SECOND);
807    rpc2.setExcludeNamespaces(namespaces);
808    rpc2.setExcludeTableCFsMap(tableCfs);
809    // OK to add a new peer which replicate_all flag is true and with exclude namespaces, exclude
810    // table-cfs config
811    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
812
813    hbaseAdmin.removeReplicationPeer(ID_ONE);
814    hbaseAdmin.removeReplicationPeer(ID_SECOND);
815  }
816
817  @Test
818  public void testNamespacesAndTableCfsConfigConflict() throws Exception {
819    String ns1 = "ns1";
820    String ns2 = "ns2";
821    final TableName tableName1 = TableName.valueOf(ns1 + ":" + name.getMethodName());
822    final TableName tableName2 = TableName.valueOf(ns2 + ":" + name.getMethodName() + "2");
823
824    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
825    rpc.setClusterKey(KEY_ONE);
826    rpc.setReplicateAllUserTables(false);
827    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
828
829    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
830    Set<String> namespaces = new HashSet<String>();
831    namespaces.add(ns1);
832    rpc.setNamespaces(namespaces);
833    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
834    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
835    try {
836      Map<TableName, List<String>> tableCfs = new HashMap<>();
837      tableCfs.put(tableName1, new ArrayList<>());
838      rpc.setTableCFsMap(tableCfs);
839      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
840      fail("Should throw ReplicationException" + " Because table " + tableName1
841          + " conflict with namespace " + ns1);
842    } catch (Exception e) {
843      // OK
844    }
845
846    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
847    Map<TableName, List<String>> tableCfs = new HashMap<>();
848    tableCfs.put(tableName2, new ArrayList<>());
849    rpc.setTableCFsMap(tableCfs);
850    hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
851    rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
852    try {
853      namespaces.clear();
854      namespaces.add(ns2);
855      rpc.setNamespaces(namespaces);
856      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
857      fail("Should throw ReplicationException" + " Because namespace " + ns2
858          + " conflict with table " + tableName2);
859    } catch (Exception e) {
860      // OK
861    }
862
863    ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
864    rpc2.setClusterKey(KEY_SECOND);
865    hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2);
866
867    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
868    Set<String> excludeNamespaces = new HashSet<String>();
869    excludeNamespaces.add(ns1);
870    rpc2.setExcludeNamespaces(excludeNamespaces);
871    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
872    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
873    try {
874      Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
875      excludeTableCfs.put(tableName1, new ArrayList<>());
876      rpc2.setExcludeTableCFsMap(excludeTableCfs);
877      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
878      fail("Should throw ReplicationException" + " Because exclude table " + tableName1
879          + " conflict with exclude namespace " + ns1);
880    } catch (Exception e) {
881      // OK
882    }
883
884    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
885    Map<TableName, List<String>> excludeTableCfs = new HashMap<>();
886    excludeTableCfs.put(tableName2, new ArrayList<>());
887    rpc2.setExcludeTableCFsMap(excludeTableCfs);
888    hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
889    rpc2 = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
890    try {
891      namespaces.clear();
892      namespaces.add(ns2);
893      rpc2.setNamespaces(namespaces);
894      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, rpc2);
895      fail("Should throw ReplicationException" + " Because exclude namespace " + ns2
896          + " conflict with exclude table " + tableName2);
897    } catch (Exception e) {
898      // OK
899    }
900
901    hbaseAdmin.removeReplicationPeer(ID_ONE);
902    hbaseAdmin.removeReplicationPeer(ID_SECOND);
903  }
904
905  @Test
906  public void testPeerBandwidth() throws Exception {
907    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
908    rpc.setClusterKey(KEY_ONE);
909    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
910
911    rpc = admin.getPeerConfig(ID_ONE);
912    assertEquals(0, rpc.getBandwidth());
913
914    rpc.setBandwidth(2097152);
915    admin.updatePeerConfig(ID_ONE, rpc);
916
917    assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth());
918    admin.removePeer(ID_ONE);
919  }
920
921  @Test
922  public void testPeerClusterKey() throws Exception {
923    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
924    builder.setClusterKey(KEY_ONE);
925    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
926
927    try {
928      builder.setClusterKey(KEY_SECOND);
929      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
930      fail("Change cluster key on an existing peer is not allowed");
931    } catch (Exception e) {
932      // OK
933    }
934  }
935
936  @Test
937  public void testPeerReplicationEndpointImpl() throws Exception {
938    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
939    builder.setClusterKey(KEY_ONE);
940    builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
941    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
942
943    try {
944      builder.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName());
945      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
946      fail("Change replication endpoint implementation class on an existing peer is not allowed");
947    } catch (Exception e) {
948      // OK
949    }
950
951    try {
952      builder = ReplicationPeerConfig.newBuilder();
953      builder.setClusterKey(KEY_ONE);
954      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
955      fail("Change replication endpoint implementation class on an existing peer is not allowed");
956    } catch (Exception e) {
957      // OK
958    }
959
960    builder = ReplicationPeerConfig.newBuilder();
961    builder.setClusterKey(KEY_SECOND);
962    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
963
964    try {
965      builder.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName());
966      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
967      fail("Change replication endpoint implementation class on an existing peer is not allowed");
968    } catch (Exception e) {
969      // OK
970    }
971  }
972
973  @Test
974  public void testPeerRemoteWALDir() throws Exception {
975    TableName tableName = TableName.valueOf(name.getMethodName());
976
977    String rootDir = "hdfs://srv1:9999/hbase";
978    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
979    builder.setClusterKey(KEY_ONE);
980    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
981
982    ReplicationPeerConfig rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
983    assertNull(rpc.getRemoteWALDir());
984
985    builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
986    try {
987      hbaseAdmin.updateReplicationPeerConfig(ID_ONE, builder.build());
988      fail("Change remote wal dir is not allowed");
989    } catch (Exception e) {
990      // OK
991      LOG.info("Expected error:", e);
992    }
993
994    builder = ReplicationPeerConfig.newBuilder();
995    builder.setClusterKey(KEY_SECOND);
996    builder.setRemoteWALDir("whatever");
997
998    try {
999      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
1000      fail("Only support replicated table config for sync replication");
1001    } catch (Exception e) {
1002      // OK
1003      LOG.info("Expected error:", e);
1004    }
1005
1006    builder.setReplicateAllUserTables(false);
1007    Set<String> namespaces = new HashSet<String>();
1008    namespaces.add("ns1");
1009    builder.setNamespaces(namespaces);
1010    try {
1011      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
1012      fail("Only support replicated table config for sync replication");
1013    } catch (Exception e) {
1014      // OK
1015      LOG.info("Expected error:", e);
1016    }
1017
1018    builder.setNamespaces(null);
1019    try {
1020      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
1021      fail("Only support replicated table config for sync replication, and tables can't be empty");
1022    } catch (Exception e) {
1023      // OK
1024      LOG.info("Expected error:", e);
1025    }
1026
1027    Map<TableName, List<String>> tableCfs = new HashMap<>();
1028    tableCfs.put(tableName, Arrays.asList("cf1"));
1029    builder.setTableCFsMap(tableCfs);
1030    try {
1031      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
1032      fail("Only support replicated table config for sync replication");
1033    } catch (Exception e) {
1034      // OK
1035      LOG.info("Expected error:", e);
1036    }
1037
1038    tableCfs = new HashMap<>();
1039    tableCfs.put(tableName, new ArrayList<>());
1040    builder.setTableCFsMap(tableCfs);
1041    try {
1042      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
1043      fail("The remote WAL dir must be absolute");
1044    } catch (Exception e) {
1045      // OK
1046      LOG.info("Expected error:", e);
1047    }
1048
1049    builder.setRemoteWALDir("/hbase/remoteWALs");
1050    try {
1051      hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
1052      fail("The remote WAL dir must be qualified");
1053    } catch (Exception e) {
1054      // OK
1055      LOG.info("Expected error:", e);
1056    }
1057
1058    builder.setRemoteWALDir(rootDir);
1059    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
1060    rpc = hbaseAdmin.getReplicationPeerConfig(ID_SECOND);
1061    assertEquals(rootDir, rpc.getRemoteWALDir());
1062
1063    try {
1064      builder.setRemoteWALDir("hdfs://srv2:8888/hbase");
1065      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
1066      fail("Change remote wal dir is not allowed");
1067    } catch (Exception e) {
1068      // OK
1069      LOG.info("Expected error:", e);
1070    }
1071
1072    try {
1073      builder.setRemoteWALDir(null);
1074      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
1075      fail("Change remote wal dir is not allowed");
1076    } catch (Exception e) {
1077      // OK
1078      LOG.info("Expected error:", e);
1079    }
1080
1081    try {
1082      builder = ReplicationPeerConfig.newBuilder(rpc);
1083      tableCfs = new HashMap<>();
1084      tableCfs.put(TableName.valueOf("ns1:" + name.getMethodName()), new ArrayList<>());
1085      builder.setTableCFsMap(tableCfs);
1086      hbaseAdmin.updateReplicationPeerConfig(ID_SECOND, builder.build());
1087      fail(
1088        "Change replicated table config on an existing synchronous peer is not allowed");
1089    } catch (Exception e) {
1090      // OK
1091      LOG.info("Expected error:", e);
1092    }
1093  }
1094
1095  @Test
1096  public void testTransitSyncReplicationPeerState() throws Exception {
1097    TableName tableName = TableName.valueOf(name.getMethodName());
1098    TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
1099    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
1100    builder.setClusterKey(KEY_ONE);
1101    builder.setReplicateAllUserTables(false);
1102    hbaseAdmin.addReplicationPeer(ID_ONE, builder.build());
1103    assertEquals(SyncReplicationState.NONE,
1104      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_ONE));
1105
1106    try {
1107      hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_ONE,
1108        SyncReplicationState.DOWNGRADE_ACTIVE);
1109      fail("Can't transit sync replication state if replication peer don't config remote wal dir");
1110    } catch (Exception e) {
1111      // OK
1112      LOG.info("Expected error:", e);
1113    }
1114
1115    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
1116    builder = ReplicationPeerConfig.newBuilder();
1117    builder.setClusterKey(KEY_SECOND);
1118    builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
1119      TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
1120    builder.setReplicateAllUserTables(false);
1121    Map<TableName, List<String>> tableCfs = new HashMap<>();
1122    tableCfs.put(tableName, new ArrayList<>());
1123    builder.setTableCFsMap(tableCfs);
1124    hbaseAdmin.addReplicationPeer(ID_SECOND, builder.build());
1125    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
1126      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1127
1128    // Disable and enable peer don't affect SyncReplicationState
1129    hbaseAdmin.disableReplicationPeer(ID_SECOND);
1130    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
1131      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1132    hbaseAdmin.enableReplicationPeer(ID_SECOND);
1133    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
1134      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1135
1136    try {
1137      hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
1138      fail("Can't transit sync replication state to ACTIVE if remote wal dir does not exist");
1139    } catch (Exception e) {
1140      // OK
1141      LOG.info("Expected error:", e);
1142    }
1143    TEST_UTIL.getTestFileSystem()
1144      .mkdirs(ReplicationUtils.getPeerRemoteWALDir(rootDir, ID_SECOND));
1145    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
1146    assertEquals(SyncReplicationState.ACTIVE,
1147      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1148
1149    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY);
1150    assertEquals(SyncReplicationState.STANDBY,
1151      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1152
1153    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
1154      SyncReplicationState.DOWNGRADE_ACTIVE);
1155    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
1156      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1157
1158    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
1159    assertEquals(SyncReplicationState.ACTIVE,
1160      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1161
1162    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
1163      SyncReplicationState.DOWNGRADE_ACTIVE);
1164    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
1165      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1166
1167    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.STANDBY);
1168    assertEquals(SyncReplicationState.STANDBY,
1169      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1170    try {
1171      hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
1172      fail("Can't transit sync replication state from STANDBY to ACTIVE");
1173    } catch (Exception e) {
1174      // OK
1175      LOG.info("Expected error:", e);
1176    }
1177    hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND,
1178      SyncReplicationState.DOWNGRADE_ACTIVE);
1179    assertEquals(SyncReplicationState.DOWNGRADE_ACTIVE,
1180      hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
1181    hbaseAdmin.removeReplicationPeer(ID_ONE);
1182    hbaseAdmin.removeReplicationPeer(ID_SECOND);
1183    assertEquals(0, hbaseAdmin.listReplicationPeers().size());
1184  }
1185}