001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static java.util.stream.Collectors.toList;
021import static java.util.stream.Collectors.toSet;
022import static org.hamcrest.CoreMatchers.instanceOf;
023import static org.hamcrest.MatcherAssert.assertThat;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertFalse;
026import static org.junit.Assert.assertNotEquals;
027import static org.junit.Assert.assertNotNull;
028import static org.junit.Assert.assertNull;
029import static org.junit.Assert.assertTrue;
030import static org.junit.Assert.fail;
031
032import java.io.IOException;
033import java.util.HashMap;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Random;
038import java.util.Set;
039import java.util.stream.Stream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.HBaseClassTestRule;
042import org.apache.hadoop.hbase.HBaseZKTestingUtil;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.ReplicationTests;
047import org.apache.hadoop.hbase.zookeeper.ZKUtil;
048import org.apache.zookeeper.KeeperException;
049import org.junit.After;
050import org.junit.AfterClass;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055
056@Category({ ReplicationTests.class, MediumTests.class })
057public class TestZKReplicationPeerStorage {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061      HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
062
063  private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil();
064
065  private static ZKReplicationPeerStorage STORAGE;
066
067  @BeforeClass
068  public static void setUp() throws Exception {
069    UTIL.startMiniZKCluster();
070    STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
071  }
072
073  @AfterClass
074  public static void tearDown() throws IOException {
075    UTIL.shutdownMiniZKCluster();
076  }
077
078  @After
079  public void cleanCustomConfigurations() {
080    UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
081  }
082
083  private Set<String> randNamespaces(Random rand) {
084    return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
085        .collect(toSet());
086  }
087
088  private Map<TableName, List<String>> randTableCFs(Random rand) {
089    int size = rand.nextInt(5);
090    Map<TableName, List<String>> map = new HashMap<>();
091    for (int i = 0; i < size; i++) {
092      TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
093      List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
094          .limit(rand.nextInt(5)).collect(toList());
095      map.put(tn, cfs);
096    }
097    return map;
098  }
099
100  private ReplicationPeerConfig getConfig(int seed) {
101    Random rand = new Random(seed);
102    return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
103        .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
104        .setRemoteWALDir(Long.toHexString(rand.nextLong())).setNamespaces(randNamespaces(rand))
105        .setExcludeNamespaces(randNamespaces(rand)).setTableCFsMap(randTableCFs(rand))
106        .setExcludeTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
107        .setBandwidth(rand.nextInt(1000)).build();
108  }
109
110  private void assertSetEquals(Set<String> expected, Set<String> actual) {
111    if (expected == null || expected.size() == 0) {
112      assertTrue(actual == null || actual.size() == 0);
113      return;
114    }
115    assertEquals(expected.size(), actual.size());
116    expected.forEach(s -> assertTrue(actual.contains(s)));
117  }
118
119  private void assertMapEquals(Map<TableName, List<String>> expected,
120      Map<TableName, List<String>> actual) {
121    if (expected == null || expected.size() == 0) {
122      assertTrue(actual == null || actual.size() == 0);
123      return;
124    }
125    assertEquals(expected.size(), actual.size());
126    expected.forEach((expectedTn, expectedCFs) -> {
127      List<String> actualCFs = actual.get(expectedTn);
128      if (expectedCFs == null || expectedCFs.size() == 0) {
129        assertTrue(actual.containsKey(expectedTn));
130        assertTrue(actualCFs == null || actualCFs.size() == 0);
131      } else {
132        assertNotNull(actualCFs);
133        assertEquals(expectedCFs.size(), actualCFs.size());
134        for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator();
135          expectedIt.hasNext();) {
136          assertEquals(expectedIt.next(), actualIt.next());
137        }
138      }
139    });
140  }
141
142  private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
143    assertEquals(expected.getClusterKey(), actual.getClusterKey());
144    assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
145    assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
146    assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
147    assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
148    assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
149    assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
150    assertEquals(expected.getBandwidth(), actual.getBandwidth());
151  }
152
153  @Test
154  public void test() throws ReplicationException {
155    int peerCount = 10;
156    for (int i = 0; i < peerCount; i++) {
157      STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0,
158        SyncReplicationState.valueOf(i % 4));
159    }
160    List<String> peerIds = STORAGE.listPeerIds();
161    assertEquals(peerCount, peerIds.size());
162    for (String peerId : peerIds) {
163      int seed = Integer.parseInt(peerId);
164      assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
165    }
166    for (int i = 0; i < peerCount; i++) {
167      STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
168    }
169    for (String peerId : peerIds) {
170      int seed = Integer.parseInt(peerId);
171      assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
172    }
173    for (int i = 0; i < peerCount; i++) {
174      assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
175    }
176    for (int i = 0; i < peerCount; i++) {
177      STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
178    }
179    for (int i = 0; i < peerCount; i++) {
180      assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
181    }
182    for (int i = 0; i < peerCount; i++) {
183      assertEquals(SyncReplicationState.valueOf(i % 4),
184        STORAGE.getPeerSyncReplicationState(Integer.toString(i)));
185    }
186    String toRemove = Integer.toString(peerCount / 2);
187    STORAGE.removePeer(toRemove);
188    peerIds = STORAGE.listPeerIds();
189    assertEquals(peerCount - 1, peerIds.size());
190    assertFalse(peerIds.contains(toRemove));
191
192    try {
193      STORAGE.getPeerConfig(toRemove);
194      fail("Should throw a ReplicationException when getting peer config of a removed peer");
195    } catch (ReplicationException e) {
196    }
197  }
198
199  @Test
200  public void testNoSyncReplicationState()
201      throws ReplicationException, KeeperException, IOException {
202    // This could happen for a peer created before we introduce sync replication.
203    String peerId = "testNoSyncReplicationState";
204    try {
205      STORAGE.getPeerSyncReplicationState(peerId);
206      fail("Should throw a ReplicationException when getting state of inexist peer");
207    } catch (ReplicationException e) {
208      // expected
209    }
210    try {
211      STORAGE.getPeerNewSyncReplicationState(peerId);
212      fail("Should throw a ReplicationException when getting state of inexist peer");
213    } catch (ReplicationException e) {
214      // expected
215    }
216    STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE);
217    // delete the sync replication state node to simulate
218    ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId));
219    ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getNewSyncReplicationStateNode(peerId));
220    // should not throw exception as the peer exists
221    assertEquals(SyncReplicationState.NONE, STORAGE.getPeerSyncReplicationState(peerId));
222    assertEquals(SyncReplicationState.NONE, STORAGE.getPeerNewSyncReplicationState(peerId));
223    // make sure we create the node for the old format peer
224    assertNotEquals(-1,
225      ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId)));
226    assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
227      STORAGE.getNewSyncReplicationStateNode(peerId)));
228  }
229
230  @Test
231  public void testBaseReplicationPeerConfig() throws ReplicationException {
232    String customPeerConfigKey = "hbase.xxx.custom_config";
233    String customPeerConfigValue = "test";
234    String customPeerConfigUpdatedValue = "testUpdated";
235
236    String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
237    String customPeerConfigSecondValue = "testSecond";
238    String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
239
240    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
241
242    // custom config not present
243    assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
244
245    Configuration conf = UTIL.getConfiguration();
246    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
247      customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";").
248        concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
249
250    ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
251      updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
252
253    // validates base configs are present in replicationPeerConfig
254    assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
255      get(customPeerConfigKey));
256    assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration().
257      get(customPeerConfigSecondKey));
258
259    // validates base configs get updated values even if config already present
260    conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
261    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
262      customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";").
263        concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
264
265    ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil.
266      updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
267
268    assertEquals(customPeerConfigUpdatedValue, replicationPeerConfigAfterValueUpdate.
269      getConfiguration().get(customPeerConfigKey));
270    assertEquals(customPeerConfigSecondUpdatedValue, replicationPeerConfigAfterValueUpdate.
271      getConfiguration().get(customPeerConfigSecondKey));
272  }
273
274  @Test
275  public void testBaseReplicationRemovePeerConfig() throws ReplicationException {
276    String customPeerConfigKey = "hbase.xxx.custom_config";
277    String customPeerConfigValue = "test";
278    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
279
280    // custom config not present
281    assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
282
283    Configuration conf = UTIL.getConfiguration();
284    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
285      customPeerConfigKey.concat("=").concat(customPeerConfigValue));
286
287    ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
288      updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
289
290    // validates base configs are present in replicationPeerConfig
291    assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
292      get(customPeerConfigKey));
293
294    conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
295    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
296      customPeerConfigKey.concat("=").concat(""));
297
298    ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil.
299      updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig);
300
301    assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey));
302  }
303
304  @Test
305  public void testBaseReplicationRemovePeerConfigWithNoExistingConfig()
306    throws ReplicationException {
307    String customPeerConfigKey = "hbase.xxx.custom_config";
308    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
309
310    // custom config not present
311    assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
312    Configuration conf = UTIL.getConfiguration();
313    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
314      customPeerConfigKey.concat("=").concat(""));
315
316    ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
317      updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig);
318    assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey));
319  }
320
321  @Test
322  public void testPeerNameControl() throws Exception {
323    String clusterKey = "key";
324    STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true,
325      SyncReplicationState.NONE);
326
327    try {
328      STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(),
329        true, SyncReplicationState.NONE);
330      fail();
331    } catch (ReplicationException e) {
332      assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class));
333    } finally {
334      // clean up
335      STORAGE.removePeer("6");
336    }
337  }
338}