001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static java.util.stream.Collectors.toList; 021import static java.util.stream.Collectors.toSet; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertNull; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.Random; 035import java.util.Set; 036import java.util.stream.Stream; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseZKTestingUtility; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 043import org.apache.hadoop.hbase.testclassification.MediumTests; 044import org.apache.hadoop.hbase.testclassification.ReplicationTests; 045import org.junit.AfterClass; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050 051@Category({ ReplicationTests.class, MediumTests.class }) 052public class TestZKReplicationPeerStorage { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class); 057 058 private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); 059 060 private static ZKReplicationPeerStorage STORAGE; 061 062 @BeforeClass 063 public static void setUp() throws Exception { 064 UTIL.startMiniZKCluster(); 065 STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 066 } 067 068 @AfterClass 069 public static void tearDown() throws IOException { 070 UTIL.shutdownMiniZKCluster(); 071 } 072 073 private Set<String> randNamespaces(Random rand) { 074 return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) 075 .collect(toSet()); 076 } 077 078 private Map<TableName, List<String>> randTableCFs(Random rand) { 079 int size = rand.nextInt(5); 080 Map<TableName, List<String>> map = new HashMap<>(); 081 for (int i = 0; i < size; i++) { 082 TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); 083 List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) 084 .limit(rand.nextInt(5)).collect(toList()); 085 map.put(tn, cfs); 086 } 087 return map; 088 } 089 090 private ReplicationPeerConfig getConfig(int seed) { 091 Random rand = new Random(seed); 092 return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong())) 093 .setReplicationEndpointImpl(Long.toHexString(rand.nextLong())) 094 .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand)) 095 .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) 096 .setBandwidth(rand.nextInt(1000)).build(); 097 } 098 099 private void assertSetEquals(Set<String> expected, Set<String> actual) { 100 if (expected == null || expected.size() == 0) { 101 assertTrue(actual == null || actual.size() == 0); 102 return; 103 } 104 assertEquals(expected.size(), actual.size()); 105 expected.forEach(s -> assertTrue(actual.contains(s))); 106 } 107 108 private void assertMapEquals(Map<TableName, List<String>> expected, 109 Map<TableName, List<String>> actual) { 110 if (expected == null || expected.size() == 0) { 111 assertTrue(actual == null || actual.size() == 0); 112 return; 113 } 114 assertEquals(expected.size(), actual.size()); 115 expected.forEach((expectedTn, expectedCFs) -> { 116 List<String> actualCFs = actual.get(expectedTn); 117 if (expectedCFs == null || expectedCFs.size() == 0) { 118 assertTrue(actual.containsKey(expectedTn)); 119 assertTrue(actualCFs == null || actualCFs.size() == 0); 120 } else { 121 assertNotNull(actualCFs); 122 assertEquals(expectedCFs.size(), actualCFs.size()); 123 for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator(); 124 expectedIt.hasNext();) { 125 assertEquals(expectedIt.next(), actualIt.next()); 126 } 127 } 128 }); 129 } 130 131 private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { 132 assertEquals(expected.getClusterKey(), actual.getClusterKey()); 133 assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); 134 assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); 135 assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); 136 assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); 137 assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); 138 assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); 139 assertEquals(expected.getBandwidth(), actual.getBandwidth()); 140 } 141 142 @Test 143 public void test() throws ReplicationException { 144 int peerCount = 10; 145 for (int i = 0; i < peerCount; i++) { 146 STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); 147 } 148 List<String> peerIds = STORAGE.listPeerIds(); 149 assertEquals(peerCount, peerIds.size()); 150 for (String peerId : peerIds) { 151 int seed = Integer.parseInt(peerId); 152 assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); 153 } 154 for (int i = 0; i < peerCount; i++) { 155 STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); 156 } 157 for (String peerId : peerIds) { 158 int seed = Integer.parseInt(peerId); 159 assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); 160 } 161 for (int i = 0; i < peerCount; i++) { 162 assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); 163 } 164 for (int i = 0; i < peerCount; i++) { 165 STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); 166 } 167 for (int i = 0; i < peerCount; i++) { 168 assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); 169 } 170 String toRemove = Integer.toString(peerCount / 2); 171 STORAGE.removePeer(toRemove); 172 peerIds = STORAGE.listPeerIds(); 173 assertEquals(peerCount - 1, peerIds.size()); 174 assertFalse(peerIds.contains(toRemove)); 175 176 try { 177 STORAGE.getPeerConfig(toRemove); 178 fail("Should throw a ReplicationException when get peer config of a peerId"); 179 } catch (ReplicationException e) { 180 } 181 } 182 183 @Test 184 public void testBaseReplicationPeerConfig() { 185 String customPeerConfigKey = "hbase.xxx.custom_config"; 186 String customPeerConfigValue = "test"; 187 String customPeerConfigUpdatedValue = "testUpdated"; 188 189 String customPeerConfigSecondKey = "hbase.xxx.custom_second_config"; 190 String customPeerConfigSecondValue = "testSecond"; 191 String customPeerConfigSecondUpdatedValue = "testSecondUpdated"; 192 193 ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1); 194 195 // custom config not present 196 assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null); 197 198 Configuration conf = UTIL.getConfiguration(); 199 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 200 customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";"). 201 concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue)); 202 203 ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil. 204 addBasePeerConfigsIfNotPresent(conf,existingReplicationPeerConfig); 205 206 // validates base configs are present in replicationPeerConfig 207 assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration(). 208 get(customPeerConfigKey)); 209 assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration(). 210 get(customPeerConfigSecondKey)); 211 212 // validates base configs does not override value if config already present 213 conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG, 214 customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";"). 215 concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue)); 216 217 ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil. 218 addBasePeerConfigsIfNotPresent(conf,updatedReplicationPeerConfig); 219 220 assertEquals(customPeerConfigValue, replicationPeerConfigAfterValueUpdate. 221 getConfiguration().get(customPeerConfigKey)); 222 assertEquals(customPeerConfigSecondValue, replicationPeerConfigAfterValueUpdate. 223 getConfiguration().get(customPeerConfigSecondKey)); 224 } 225}