001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static java.util.stream.Collectors.toList; 021import static java.util.stream.Collectors.toSet; 022import static org.hamcrest.CoreMatchers.instanceOf; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotNull; 027import static org.junit.Assert.assertNull; 028import static org.junit.Assert.assertTrue; 029import static org.junit.Assert.fail; 030 031import java.io.IOException; 032import java.util.HashMap; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.Random; 037import java.util.Set; 038import java.util.stream.Stream; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseZKTestingUtility; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 044import org.apache.hadoop.hbase.testclassification.MediumTests; 045import org.apache.hadoop.hbase.testclassification.ReplicationTests; 046import org.apache.zookeeper.KeeperException; 047import org.junit.After; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053 054@Category({ ReplicationTests.class, MediumTests.class }) 055public class TestZKReplicationPeerStorage { 056 057 @ClassRule 058 public static final HBaseClassTestRule CLASS_RULE = 059 HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class); 060 061 private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); 062 private static final Random RNG = new Random(); // Seed may be set with Random#setSeed 063 private static ZKReplicationPeerStorage STORAGE; 064 065 @BeforeClass 066 public static void setUp() throws Exception { 067 UTIL.startMiniZKCluster(); 068 STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 069 } 070 071 @AfterClass 072 public static void tearDown() throws IOException { 073 UTIL.shutdownMiniZKCluster(); 074 } 075 076 @After 077 public void cleanCustomConfigurations() { 078 UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 079 } 080 081 private Set<String> randNamespaces(Random rand) { 082 return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) 083 .collect(toSet()); 084 } 085 086 private Map<TableName, List<String>> randTableCFs(Random rand) { 087 int size = rand.nextInt(5); 088 Map<TableName, List<String>> map = new HashMap<>(); 089 for (int i = 0; i < size; i++) { 090 TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); 091 List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) 092 .limit(rand.nextInt(5)).collect(toList()); 093 map.put(tn, cfs); 094 } 095 return map; 096 } 097 098 private ReplicationPeerConfig getConfig(int seed) { 099 RNG.setSeed(seed); 100 return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) 101 .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) 102 .setNamespaces(randNamespaces(RNG)).setExcludeNamespaces(randNamespaces(RNG)) 103 .setTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean()) 104 .setBandwidth(RNG.nextInt(1000)).build(); 105 } 106 107 private void assertSetEquals(Set<String> expected, Set<String> actual) { 108 if (expected == null || expected.size() == 0) { 109 assertTrue(actual == null || actual.size() == 0); 110 return; 111 } 112 assertEquals(expected.size(), actual.size()); 113 expected.forEach(s -> assertTrue(actual.contains(s))); 114 } 115 116 private void assertMapEquals(Map<TableName, List<String>> expected, 117 Map<TableName, List<String>> actual) { 118 if (expected == null || expected.size() == 0) { 119 assertTrue(actual == null || actual.size() == 0); 120 return; 121 } 122 assertEquals(expected.size(), actual.size()); 123 expected.forEach((expectedTn, expectedCFs) -> { 124 List<String> actualCFs = actual.get(expectedTn); 125 if (expectedCFs == null || expectedCFs.size() == 0) { 126 assertTrue(actual.containsKey(expectedTn)); 127 assertTrue(actualCFs == null || actualCFs.size() == 0); 128 } else { 129 assertNotNull(actualCFs); 130 assertEquals(expectedCFs.size(), actualCFs.size()); 131 for (Iterator<String> expectedIt = expectedCFs.iterator(), 132 actualIt = actualCFs.iterator(); expectedIt.hasNext();) { 133 assertEquals(expectedIt.next(), actualIt.next()); 134 } 135 } 136 }); 137 } 138 139 private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { 140 assertEquals(expected.getClusterKey(), actual.getClusterKey()); 141 assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); 142 assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); 143 assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); 144 assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); 145 assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); 146 assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); 147 assertEquals(expected.getBandwidth(), actual.getBandwidth()); 148 } 149 150 @Test 151 public void test() throws ReplicationException { 152 int peerCount = 10; 153 for (int i = 0; i < peerCount; i++) { 154 STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); 155 } 156 List<String> peerIds = STORAGE.listPeerIds(); 157 assertEquals(peerCount, peerIds.size()); 158 for (String peerId : peerIds) { 159 int seed = Integer.parseInt(peerId); 160 assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); 161 } 162 for (int i = 0; i < peerCount; i++) { 163 STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); 164 } 165 for (String peerId : peerIds) { 166 int seed = Integer.parseInt(peerId); 167 assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); 168 } 169 for (int i = 0; i < peerCount; i++) { 170 assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); 171 } 172 for (int i = 0; i < peerCount; i++) { 173 STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); 174 } 175 for (int i = 0; i < peerCount; i++) { 176 assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); 177 } 178 String toRemove = Integer.toString(peerCount / 2); 179 STORAGE.removePeer(toRemove); 180 peerIds = STORAGE.listPeerIds(); 181 assertEquals(peerCount - 1, peerIds.size()); 182 assertFalse(peerIds.contains(toRemove)); 183 184 try { 185 STORAGE.getPeerConfig(toRemove); 186 fail("Should throw a ReplicationException when get peer config of a peerId"); 187 } catch (ReplicationException e) { 188 } 189 } 190 191 @Test 192 public void testBaseReplicationPeerConfig() throws ReplicationException { 193 String customPeerConfigKey = "hbase.xxx.custom_config"; 194 String customPeerConfigValue = "test"; 195 String customPeerConfigUpdatedValue = "testUpdated"; 196 197 String customPeerConfigSecondKey = "hbase.xxx.custom_second_config"; 198 String customPeerConfigSecondValue = "testSecond"; 199 String customPeerConfigSecondUpdatedValue = "testSecondUpdated"; 200 201 ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); 202 203 // custom config not present 204 assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); 205 206 Configuration conf = UTIL.getConfiguration(); 207 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 208 customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";") 209 .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue)); 210 211 ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil 212 .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); 213 214 // validates base configs are present in replicationPeerConfig 215 assertEquals(customPeerConfigValue, 216 updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); 217 assertEquals(customPeerConfigSecondValue, 218 updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey)); 219 220 // validates base configs get updated values even if config already present 221 conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 222 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 223 customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";") 224 .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue)); 225 226 ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil 227 .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig); 228 229 assertEquals(customPeerConfigUpdatedValue, 230 replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey)); 231 assertEquals(customPeerConfigSecondUpdatedValue, 232 replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey)); 233 } 234 235 @Test 236 public void testBaseReplicationRemovePeerConfig() throws ReplicationException { 237 String customPeerConfigKey = "hbase.xxx.custom_config"; 238 String customPeerConfigValue = "test"; 239 ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); 240 241 // custom config not present 242 assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); 243 244 Configuration conf = UTIL.getConfiguration(); 245 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 246 customPeerConfigKey.concat("=").concat(customPeerConfigValue)); 247 248 ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil 249 .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); 250 251 // validates base configs are present in replicationPeerConfig 252 assertEquals(customPeerConfigValue, 253 updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); 254 255 conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 256 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 257 customPeerConfigKey.concat("=").concat("")); 258 259 ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil 260 .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig); 261 262 assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey)); 263 } 264 265 @Test 266 public void testBaseReplicationRemovePeerConfigWithNoExistingConfig() 267 throws ReplicationException { 268 String customPeerConfigKey = "hbase.xxx.custom_config"; 269 ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); 270 271 // custom config not present 272 assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); 273 Configuration conf = UTIL.getConfiguration(); 274 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 275 customPeerConfigKey.concat("=").concat("")); 276 277 ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil 278 .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); 279 assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); 280 } 281 282 @Test 283 public void testPeerNameControl() throws Exception { 284 String clusterKey = "key"; 285 STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), 286 true); 287 288 try { 289 STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), 290 true); 291 fail(); 292 } catch (ReplicationException e) { 293 assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class)); 294 } finally { 295 // clean up 296 STORAGE.removePeer("6"); 297 } 298 } 299}