001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static java.util.stream.Collectors.toList; 021import static java.util.stream.Collectors.toSet; 022import static org.hamcrest.CoreMatchers.instanceOf; 023import static org.hamcrest.MatcherAssert.assertThat; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertFalse; 026import static org.junit.Assert.assertNotEquals; 027import static org.junit.Assert.assertNotNull; 028import static org.junit.Assert.assertNull; 029import static org.junit.Assert.assertTrue; 030import static org.junit.Assert.fail; 031 032import java.io.IOException; 033import java.util.HashMap; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Random; 038import java.util.Set; 039import java.util.stream.Stream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HBaseClassTestRule; 042import org.apache.hadoop.hbase.HBaseZKTestingUtil; 043import org.apache.hadoop.hbase.TableName; 044import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.ReplicationTests; 047import org.apache.hadoop.hbase.zookeeper.ZKUtil; 048import org.apache.zookeeper.KeeperException; 049import org.junit.After; 050import org.junit.AfterClass; 051import org.junit.BeforeClass; 052import org.junit.ClassRule; 053import org.junit.Test; 054import org.junit.experimental.categories.Category; 055 056@Category({ ReplicationTests.class, MediumTests.class }) 057public class TestZKReplicationPeerStorage { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class); 062 063 private static final HBaseZKTestingUtil UTIL = new HBaseZKTestingUtil(); 064 private static final Random RNG = new Random(); // Seed may be set with Random#setSeed 065 private static ZKReplicationPeerStorage STORAGE; 066 067 @BeforeClass 068 public static void setUp() throws Exception { 069 UTIL.startMiniZKCluster(); 070 STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 071 } 072 073 @AfterClass 074 public static void tearDown() throws IOException { 075 UTIL.shutdownMiniZKCluster(); 076 } 077 078 @After 079 public void cleanCustomConfigurations() { 080 UTIL.getConfiguration().unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 081 } 082 083 private Set<String> randNamespaces(Random rand) { 084 return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) 085 .collect(toSet()); 086 } 087 088 private Map<TableName, List<String>> randTableCFs(Random rand) { 089 int size = rand.nextInt(5); 090 Map<TableName, List<String>> map = new HashMap<>(); 091 for (int i = 0; i < size; i++) { 092 TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); 093 List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) 094 .limit(rand.nextInt(5)).collect(toList()); 095 map.put(tn, cfs); 096 } 097 return map; 098 } 099 100 private ReplicationPeerConfig getConfig(int seed) { 101 RNG.setSeed(seed); 102 return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(RNG.nextLong())) 103 .setReplicationEndpointImpl(Long.toHexString(RNG.nextLong())) 104 .setRemoteWALDir(Long.toHexString(RNG.nextLong())).setNamespaces(randNamespaces(RNG)) 105 .setExcludeNamespaces(randNamespaces(RNG)).setTableCFsMap(randTableCFs(RNG)) 106 .setExcludeTableCFsMap(randTableCFs(RNG)).setReplicateAllUserTables(RNG.nextBoolean()) 107 .setBandwidth(RNG.nextInt(1000)).build(); 108 } 109 110 private void assertSetEquals(Set<String> expected, Set<String> actual) { 111 if (expected == null || expected.size() == 0) { 112 assertTrue(actual == null || actual.size() == 0); 113 return; 114 } 115 assertEquals(expected.size(), actual.size()); 116 expected.forEach(s -> assertTrue(actual.contains(s))); 117 } 118 119 private void assertMapEquals(Map<TableName, List<String>> expected, 120 Map<TableName, List<String>> actual) { 121 if (expected == null || expected.size() == 0) { 122 assertTrue(actual == null || actual.size() == 0); 123 return; 124 } 125 assertEquals(expected.size(), actual.size()); 126 expected.forEach((expectedTn, expectedCFs) -> { 127 List<String> actualCFs = actual.get(expectedTn); 128 if (expectedCFs == null || expectedCFs.size() == 0) { 129 assertTrue(actual.containsKey(expectedTn)); 130 assertTrue(actualCFs == null || actualCFs.size() == 0); 131 } else { 132 assertNotNull(actualCFs); 133 assertEquals(expectedCFs.size(), actualCFs.size()); 134 for (Iterator<String> expectedIt = expectedCFs.iterator(), 135 actualIt = actualCFs.iterator(); expectedIt.hasNext();) { 136 assertEquals(expectedIt.next(), actualIt.next()); 137 } 138 } 139 }); 140 } 141 142 private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { 143 assertEquals(expected.getClusterKey(), actual.getClusterKey()); 144 assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); 145 assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); 146 assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); 147 assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); 148 assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); 149 assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); 150 assertEquals(expected.getBandwidth(), actual.getBandwidth()); 151 } 152 153 @Test 154 public void test() throws ReplicationException { 155 int peerCount = 10; 156 for (int i = 0; i < peerCount; i++) { 157 STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0, 158 SyncReplicationState.valueOf(i % 4)); 159 } 160 List<String> peerIds = STORAGE.listPeerIds(); 161 assertEquals(peerCount, peerIds.size()); 162 for (String peerId : peerIds) { 163 int seed = Integer.parseInt(peerId); 164 assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); 165 } 166 for (int i = 0; i < peerCount; i++) { 167 STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); 168 } 169 for (String peerId : peerIds) { 170 int seed = Integer.parseInt(peerId); 171 assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); 172 } 173 for (int i = 0; i < peerCount; i++) { 174 assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); 175 } 176 for (int i = 0; i < peerCount; i++) { 177 STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); 178 } 179 for (int i = 0; i < peerCount; i++) { 180 assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); 181 } 182 for (int i = 0; i < peerCount; i++) { 183 assertEquals(SyncReplicationState.valueOf(i % 4), 184 STORAGE.getPeerSyncReplicationState(Integer.toString(i))); 185 } 186 String toRemove = Integer.toString(peerCount / 2); 187 STORAGE.removePeer(toRemove); 188 peerIds = STORAGE.listPeerIds(); 189 assertEquals(peerCount - 1, peerIds.size()); 190 assertFalse(peerIds.contains(toRemove)); 191 192 try { 193 STORAGE.getPeerConfig(toRemove); 194 fail("Should throw a ReplicationException when getting peer config of a removed peer"); 195 } catch (ReplicationException e) { 196 } 197 } 198 199 @Test 200 public void testNoSyncReplicationState() 201 throws ReplicationException, KeeperException, IOException { 202 // This could happen for a peer created before we introduce sync replication. 203 String peerId = "testNoSyncReplicationState"; 204 try { 205 STORAGE.getPeerSyncReplicationState(peerId); 206 fail("Should throw a ReplicationException when getting state of inexist peer"); 207 } catch (ReplicationException e) { 208 // expected 209 } 210 try { 211 STORAGE.getPeerNewSyncReplicationState(peerId); 212 fail("Should throw a ReplicationException when getting state of inexist peer"); 213 } catch (ReplicationException e) { 214 // expected 215 } 216 STORAGE.addPeer(peerId, getConfig(0), true, SyncReplicationState.NONE); 217 // delete the sync replication state node to simulate 218 ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId)); 219 ZKUtil.deleteNode(UTIL.getZooKeeperWatcher(), STORAGE.getNewSyncReplicationStateNode(peerId)); 220 // should not throw exception as the peer exists 221 assertEquals(SyncReplicationState.NONE, STORAGE.getPeerSyncReplicationState(peerId)); 222 assertEquals(SyncReplicationState.NONE, STORAGE.getPeerNewSyncReplicationState(peerId)); 223 // make sure we create the node for the old format peer 224 assertNotEquals(-1, 225 ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), STORAGE.getSyncReplicationStateNode(peerId))); 226 assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(), 227 STORAGE.getNewSyncReplicationStateNode(peerId))); 228 } 229 230 @Test 231 public void testBaseReplicationPeerConfig() throws ReplicationException { 232 String customPeerConfigKey = "hbase.xxx.custom_config"; 233 String customPeerConfigValue = "test"; 234 String customPeerConfigUpdatedValue = "testUpdated"; 235 236 String customPeerConfigSecondKey = "hbase.xxx.custom_second_config"; 237 String customPeerConfigSecondValue = "testSecond"; 238 String customPeerConfigSecondUpdatedValue = "testSecondUpdated"; 239 240 ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); 241 242 // custom config not present 243 assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); 244 245 Configuration conf = UTIL.getConfiguration(); 246 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 247 customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";") 248 .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue)); 249 250 ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil 251 .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); 252 253 // validates base configs are present in replicationPeerConfig 254 assertEquals(customPeerConfigValue, 255 updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); 256 assertEquals(customPeerConfigSecondValue, 257 updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigSecondKey)); 258 259 // validates base configs get updated values even if config already present 260 conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 261 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 262 customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";") 263 .concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue)); 264 265 ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil 266 .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig); 267 268 assertEquals(customPeerConfigUpdatedValue, 269 replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigKey)); 270 assertEquals(customPeerConfigSecondUpdatedValue, 271 replicationPeerConfigAfterValueUpdate.getConfiguration().get(customPeerConfigSecondKey)); 272 } 273 274 @Test 275 public void testBaseReplicationRemovePeerConfig() throws ReplicationException { 276 String customPeerConfigKey = "hbase.xxx.custom_config"; 277 String customPeerConfigValue = "test"; 278 ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); 279 280 // custom config not present 281 assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); 282 283 Configuration conf = UTIL.getConfiguration(); 284 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 285 customPeerConfigKey.concat("=").concat(customPeerConfigValue)); 286 287 ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil 288 .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); 289 290 // validates base configs are present in replicationPeerConfig 291 assertEquals(customPeerConfigValue, 292 updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); 293 294 conf.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG); 295 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 296 customPeerConfigKey.concat("=").concat("")); 297 298 ReplicationPeerConfig replicationPeerConfigRemoved = ReplicationPeerConfigUtil 299 .updateReplicationBasePeerConfigs(conf, updatedReplicationPeerConfig); 300 301 assertNull(replicationPeerConfigRemoved.getConfiguration().get(customPeerConfigKey)); 302 } 303 304 @Test 305 public void testBaseReplicationRemovePeerConfigWithNoExistingConfig() 306 throws ReplicationException { 307 String customPeerConfigKey = "hbase.xxx.custom_config"; 308 ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); 309 310 // custom config not present 311 assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); 312 Configuration conf = UTIL.getConfiguration(); 313 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 314 customPeerConfigKey.concat("=").concat("")); 315 316 ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil 317 .updateReplicationBasePeerConfigs(conf, existingReplicationPeerConfig); 318 assertNull(updatedReplicationPeerConfig.getConfiguration().get(customPeerConfigKey)); 319 } 320 321 @Test 322 public void testPeerNameControl() throws Exception { 323 String clusterKey = "key"; 324 STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), true, 325 SyncReplicationState.NONE); 326 327 try { 328 STORAGE.addPeer("6", ReplicationPeerConfig.newBuilder().setClusterKey(clusterKey).build(), 329 true, SyncReplicationState.NONE); 330 fail(); 331 } catch (ReplicationException e) { 332 assertThat(e.getCause(), instanceOf(KeeperException.NodeExistsException.class)); 333 } finally { 334 // clean up 335 STORAGE.removePeer("6"); 336 } 337 } 338}