001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static java.util.stream.Collectors.toList;
021import static java.util.stream.Collectors.toSet;
022import static org.hamcrest.CoreMatchers.instanceOf;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotNull;
027import static org.junit.Assert.assertNull;
028import static org.junit.Assert.assertTrue;
029import static org.junit.Assert.fail;
030
031import java.io.IOException;
032import java.util.HashMap;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.Random;
037import java.util.Set;
038import java.util.stream.Stream;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseZKTestingUtility;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.ReplicationTests;
046import org.apache.zookeeper.KeeperException;
047import org.junit.After;
048import org.junit.AfterClass;
049import org.junit.BeforeClass;
050import org.junit.ClassRule;
051import org.junit.Test;
052import org.junit.experimental.categories.Category;
053
054@Category({ ReplicationTests.class, MediumTests.class })
055public class TestZKReplicationPeerStorage {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
060
061  private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
062  private static final Random RNG = new Random(); // Seed may be set with Random#setSeed
063  private static ZKReplicationPeerStorage STORAGE;
064
065  @BeforeClass
066  public static void setUp() throws Exception {
067    UTIL.startMiniZKCluster();
068    STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
069  }
070
071  @AfterClass
072  public static void tearDown() throws IOException {
073    UTIL.shutdownMiniZKCluster();
074  }
075
076  @After
077  public void cleanCustomConfigurations() {
078    UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
079  }
080
081  private Set<String> randNamespaces(Random rand) {
082    return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
083      .collect(toSet());
084  }
085
086  private Map<TableName, List<String>> randTableCFs(Random rand) {
087    int size = rand.nextInt(5);
088    Map<TableName, List<String>> map = new HashMap<>();
089    for (int i = 0; i < size; i++) {
090      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
091      List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
092        .limit(rand.nextInt(5)).collect(toList());
093      map.put(tn, cfs);
094    }
095    return map;
096  }
097
098  private ReplicationPeerConfig getConfig(int seed) {
099    RNG.setSeed(seed);
100    return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong()))
101      .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong()))
102      .setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG))
103      .setTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean())
104      .setBandwidth(RNG.nextInt(1000)).build();
105  }
106
107  private void assertSetEquals(Set<String> expected, Set<String> actual) {
108    if (expected == null || expected.size() == 0) {
109      assertTrue(actual == null || actual.size() == 0);
110      return;
111    }
112    assertEquals(expected.size(), actual.size());
113    expected.forEach(s -> assertTrue(actual.contains(s)));
114  }
115
116  private void assertMapEquals(Map<TableName, List<String>> expected,
117    Map<TableName, List<String>> actual) {
118    if (expected == null || expected.size() == 0) {
119      assertTrue(actual == null || actual.size() == 0);
120      return;
121    }
122    assertEquals(expected.size(), actual.size());
123    expected.forEach((expectedTn, expectedCFs) -> {
124      List<String> actualCFs = actual.get(expectedTn);
125      if (expectedCFs == null || expectedCFs.size() == 0) {
126        assertTrue(actual.containsKey(expectedTn));
127        assertTrue(actualCFs == null || actualCFs.size() == 0);
128      } else {
129        assertNotNull(actualCFs);
130        assertEquals(expectedCFs.size(), actualCFs.size());
131        for (Iterator<String> expectedIt = expectedCFs.iterator(),
132            actualIt = actualCFs.iterator(); expectedIt.hasNext();) {
133          assertEquals(expectedIt.next(), actualIt.next());
134        }
135      }
136    });
137  }
138
139  private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
140    assertEquals(expected.getClusterKey(), actual.getClusterKey());
141    assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
142    assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
143    assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
144    assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
145    assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
146    assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
147    assertEquals(expected.getBandwidth(), actual.getBandwidth());
148  }
149
150  @Test
151  public void test() throws ReplicationException {
152    int peerCount = 10;
153    for (int i = 0; i < peerCount; i++) {
154      STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
155    }
156    List<String> peerIds = STORAGE.listPeerIds();
157    assertEquals(peerCount, peerIds.size());
158    for (String peerId : peerIds) {
159      int seed = Integer.parseInt(peerId);
160      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
161    }
162    for (int i = 0; i < peerCount; i++) {
163      STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
164    }
165    for (String peerId : peerIds) {
166      int seed = Integer.parseInt(peerId);
167      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
168    }
169    for (int i = 0; i < peerCount; i++) {
170      assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
171    }
172    for (int i = 0; i < peerCount; i++) {
173      STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
174    }
175    for (int i = 0; i < peerCount; i++) {
176      assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
177    }
178    String toRemove = Integer.toString(peerCount / 2);
179    STORAGE.removePeer(toRemove);
180    peerIds = STORAGE.listPeerIds();
181    assertEquals(peerCount - 1, peerIds.size());
182    assertFalse(peerIds.contains(toRemove));
183
184    try {
185      STORAGE.getPeerConfig(toRemove);
186      fail("Should throw a ReplicationException when get peer config of a peerId");
187    } catch (ReplicationException e) {
188    }
189  }
190
191  @Test
192  public void testBaseReplicationPeerConfig() throws ReplicationException {
193    String customPeerConfigKey = "hbase.xxx.custom_config";
194    String customPeerConfigValue = "test";
195    String customPeerConfigUpdatedValue = "testUpdated";
196
197    String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
198    String customPeerConfigSecondValue = "testSecond";
199    String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
200
201    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
202
203    // custom config not present
204    assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
205
206    Configuration conf = UTIL.getConfiguration();
207    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
208      customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";")
209        .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
210
211    ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
212      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
213
214    // validates base configs are present in replicationPeerConfig
215    assertEquals(customPeerConfigValue,
216      updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
217    assertEquals(customPeerConfigSecondValue,
218      updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey));
219
220    // validates base configs get updated values even if config already present
221    conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
222    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
223      customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";")
224        .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
225
226    ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil
227      .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
228
229    assertEquals(customPeerConfigUpdatedValue,
230      replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey));
231    assertEquals(customPeerConfigSecondUpdatedValue,
232      replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey));
233  }
234
235  @Test
236  public void testBaseReplicationRemovePeerConfig() throws ReplicationException {
237    String customPeerConfigKey = "hbase.xxx.custom_config";
238    String customPeerConfigValue = "test";
239    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
240
241    // custom config not present
242    assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
243
244    Configuration conf = UTIL.getConfiguration();
245    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
246      customPeerConfigKey.concat("=").concat(customPeerConfigValue));
247
248    ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
249      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
250
251    // validates base configs are present in replicationPeerConfig
252    assertEquals(customPeerConfigValue,
253      updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
254
255    conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
256    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
257      customPeerConfigKey.concat("=").concat(""));
258
259    ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil
260      .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
261
262    assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
263  }
264
265  @Test
266  public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
267    throws ReplicationException {
268    String customPeerConfigKey = "hbase.xxx.custom_config";
269    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
270
271    // custom config not present
272    assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
273    Configuration conf = UTIL.getConfiguration();
274    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
275      customPeerConfigKey.concat("=").concat(""));
276
277    ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil
278      .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
279    assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
280  }
281
282  @Test
283  public void testPeerNameControl() throws Exception {
284    String clusterKey = "key";
285    STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
286      true);
287
288    try {
289      STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
290        true);
291      fail();
292    } catch (ReplicationException e) {
293      assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class));
294    } finally {
295      // clean up
296      STORAGE.removePeer("6");
297    }
298  }
299}