001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static java.util.stream.Collectors.toList;
021import static java.util.stream.Collectors.toSet;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.util.HashMap;
030import java.util.Iterator;
031import java.util.List;
032import java.util.Map;
033import java.util.Random;
034import java.util.Set;
035import java.util.stream.Stream;
036import org.apache.hadoop.hbase.HBaseClassTestRule;
037import org.apache.hadoop.hbase.HBaseZKTestingUtility;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.testclassification.ReplicationTests;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046
047@Category({ ReplicationTests.class, MediumTests.class })
048public class TestZKReplicationPeerStorage {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052      HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
053
054  private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
055
056  private static ZKReplicationPeerStorage STORAGE;
057
058  @BeforeClass
059  public static void setUp() throws Exception {
060    UTIL.startMiniZKCluster();
061    STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
062  }
063
064  @AfterClass
065  public static void tearDown() throws IOException {
066    UTIL.shutdownMiniZKCluster();
067  }
068
069  private Set<String> randNamespaces(Random rand) {
070    return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
071        .collect(toSet());
072  }
073
074  private Map<TableName, List<String>> randTableCFs(Random rand) {
075    int size = rand.nextInt(5);
076    Map<TableName, List<String>> map = new HashMap<>();
077    for (int i = 0; i < size; i++) {
078      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
079      List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
080          .limit(rand.nextInt(5)).collect(toList());
081      map.put(tn, cfs);
082    }
083    return map;
084  }
085
086  private ReplicationPeerConfig getConfig(int seed) {
087    Random rand = new Random(seed);
088    return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
089        .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
090        .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
091        .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
092        .setBandwidth(rand.nextInt(1000)).build();
093  }
094
095  private void assertSetEquals(Set<String> expected, Set<String> actual) {
096    if (expected == null || expected.size() == 0) {
097      assertTrue(actual == null || actual.size() == 0);
098      return;
099    }
100    assertEquals(expected.size(), actual.size());
101    expected.forEach(s -> assertTrue(actual.contains(s)));
102  }
103
104  private void assertMapEquals(Map<TableName, List<String>> expected,
105      Map<TableName, List<String>> actual) {
106    if (expected == null || expected.size() == 0) {
107      assertTrue(actual == null || actual.size() == 0);
108      return;
109    }
110    assertEquals(expected.size(), actual.size());
111    expected.forEach((expectedTn, expectedCFs) -> {
112      List<String> actualCFs = actual.get(expectedTn);
113      if (expectedCFs == null || expectedCFs.size() == 0) {
114        assertTrue(actual.containsKey(expectedTn));
115        assertTrue(actualCFs == null || actualCFs.size() == 0);
116      } else {
117        assertNotNull(actualCFs);
118        assertEquals(expectedCFs.size(), actualCFs.size());
119        for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator();
120          expectedIt.hasNext();) {
121          assertEquals(expectedIt.next(), actualIt.next());
122        }
123      }
124    });
125  }
126
127  private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
128    assertEquals(expected.getClusterKey(), actual.getClusterKey());
129    assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
130    assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
131    assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
132    assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
133    assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
134    assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
135    assertEquals(expected.getBandwidth(), actual.getBandwidth());
136  }
137
138  @Test
139  public void test() throws ReplicationException {
140    int peerCount = 10;
141    for (int i = 0; i < peerCount; i++) {
142      STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
143    }
144    List<String> peerIds = STORAGE.listPeerIds();
145    assertEquals(peerCount, peerIds.size());
146    for (String peerId : peerIds) {
147      int seed = Integer.parseInt(peerId);
148      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
149    }
150    for (int i = 0; i < peerCount; i++) {
151      STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
152    }
153    for (String peerId : peerIds) {
154      int seed = Integer.parseInt(peerId);
155      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
156    }
157    for (int i = 0; i < peerCount; i++) {
158      assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
159    }
160    for (int i = 0; i < peerCount; i++) {
161      STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
162    }
163    for (int i = 0; i < peerCount; i++) {
164      assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
165    }
166    String toRemove = Integer.toString(peerCount / 2);
167    STORAGE.removePeer(toRemove);
168    peerIds = STORAGE.listPeerIds();
169    assertEquals(peerCount - 1, peerIds.size());
170    assertFalse(peerIds.contains(toRemove));
171
172    try {
173      STORAGE.getPeerConfig(toRemove);
174      fail("Should throw a ReplicationException when get peer config of a peerId");
175    } catch (ReplicationException e) {
176    }
177  }
178}