001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static java.util.stream.Collectors.toList;
021import static java.util.stream.Collectors.toSet;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNull;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.Random;
035import java.util.Set;
036import java.util.stream.Stream;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.HBaseClassTestRule;
040import org.apache.hadoop.hbase.HBaseZKTestingUtility;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.testclassification.ReplicationTests;
045import org.junit.AfterClass;
046import org.junit.BeforeClass;
047import org.junit.ClassRule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050
051@Category({ ReplicationTests.class, MediumTests.class })
052public class TestZKReplicationPeerStorage {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
057
058  private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
059
060  private static ZKReplicationPeerStorage STORAGE;
061
062  @BeforeClass
063  public static void setUp() throws Exception {
064    UTIL.startMiniZKCluster();
065    STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
066  }
067
068  @AfterClass
069  public static void tearDown() throws IOException {
070    UTIL.shutdownMiniZKCluster();
071  }
072
073  private Set<String> randNamespaces(Random rand) {
074    return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
075        .collect(toSet());
076  }
077
078  private Map<TableName, List<String>> randTableCFs(Random rand) {
079    int size = rand.nextInt(5);
080    Map<TableName, List<String>> map = new HashMap<>();
081    for (int i = 0; i < size; i++) {
082      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
083      List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
084          .limit(rand.nextInt(5)).collect(toList());
085      map.put(tn, cfs);
086    }
087    return map;
088  }
089
090  private ReplicationPeerConfig getConfig(int seed) {
091    Random rand = new Random(seed);
092    return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
093        .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
094        .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
095        .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
096        .setBandwidth(rand.nextInt(1000)).build();
097  }
098
099  private void assertSetEquals(Set<String> expected, Set<String> actual) {
100    if (expected == null || expected.size() == 0) {
101      assertTrue(actual == null || actual.size() == 0);
102      return;
103    }
104    assertEquals(expected.size(), actual.size());
105    expected.forEach(s -> assertTrue(actual.contains(s)));
106  }
107
108  private void assertMapEquals(Map<TableName, List<String>> expected,
109      Map<TableName, List<String>> actual) {
110    if (expected == null || expected.size() == 0) {
111      assertTrue(actual == null || actual.size() == 0);
112      return;
113    }
114    assertEquals(expected.size(), actual.size());
115    expected.forEach((expectedTn, expectedCFs) -> {
116      List<String> actualCFs = actual.get(expectedTn);
117      if (expectedCFs == null || expectedCFs.size() == 0) {
118        assertTrue(actual.containsKey(expectedTn));
119        assertTrue(actualCFs == null || actualCFs.size() == 0);
120      } else {
121        assertNotNull(actualCFs);
122        assertEquals(expectedCFs.size(), actualCFs.size());
123        for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator();
124          expectedIt.hasNext();) {
125          assertEquals(expectedIt.next(), actualIt.next());
126        }
127      }
128    });
129  }
130
131  private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
132    assertEquals(expected.getClusterKey(), actual.getClusterKey());
133    assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
134    assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
135    assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
136    assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
137    assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
138    assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
139    assertEquals(expected.getBandwidth(), actual.getBandwidth());
140  }
141
142  @Test
143  public void test() throws ReplicationException {
144    int peerCount = 10;
145    for (int i = 0; i < peerCount; i++) {
146      STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
147    }
148    List<String> peerIds = STORAGE.listPeerIds();
149    assertEquals(peerCount, peerIds.size());
150    for (String peerId : peerIds) {
151      int seed = Integer.parseInt(peerId);
152      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
153    }
154    for (int i = 0; i < peerCount; i++) {
155      STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
156    }
157    for (String peerId : peerIds) {
158      int seed = Integer.parseInt(peerId);
159      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
160    }
161    for (int i = 0; i < peerCount; i++) {
162      assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
163    }
164    for (int i = 0; i < peerCount; i++) {
165      STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
166    }
167    for (int i = 0; i < peerCount; i++) {
168      assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
169    }
170    String toRemove = Integer.toString(peerCount / 2);
171    STORAGE.removePeer(toRemove);
172    peerIds = STORAGE.listPeerIds();
173    assertEquals(peerCount - 1, peerIds.size());
174    assertFalse(peerIds.contains(toRemove));
175
176    try {
177      STORAGE.getPeerConfig(toRemove);
178      fail("Should throw a ReplicationException when get peer config of a peerId");
179    } catch (ReplicationException e) {
180    }
181  }
182
183  @Test
184  public void testBaseReplicationPeerConfig() {
185    String customPeerConfigKey = "hbase.xxx.custom_config";
186    String customPeerConfigValue = "test";
187    String customPeerConfigUpdatedValue = "testUpdated";
188
189    String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
190    String customPeerConfigSecondValue = "testSecond";
191    String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
192
193    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
194
195    // custom config not present
196    assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
197
198    Configuration conf = UTIL.getConfiguration();
199    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
200      customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";").
201        concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
202
203    ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
204      addBasePeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);
205
206    // validates base configs are present in replicationPeerConfig
207    assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
208      get(customPeerConfigKey));
209    assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration().
210      get(customPeerConfigSecondKey));
211
212    // validates base configs does not override value if config already present
213    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
214      customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";").
215        concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
216
217    ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil.
218      addBasePeerConfigsIfNotPresent(conf,updatedReplicationPeerConfig);
219
220    assertEquals(customPeerConfigValue, replicationPeerConfigAfterValueUpdate.
221      getConfiguration().get(customPeerConfigKey));
222    assertEquals(customPeerConfigSecondValue, replicationPeerConfigAfterValueUpdate.
223      getConfiguration().get(customPeerConfigSecondKey));
224  }
225}