001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static java.util.stream.Collectors.toList; 021import static java.util.stream.Collectors.toSet; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertFalse; 024import static org.junit.Assert.assertNotNull; 025import static org.junit.Assert.assertTrue; 026import static org.junit.Assert.fail; 027 028import java.io.IOException; 029import java.util.HashMap; 030import java.util.Iterator; 031import java.util.List; 032import java.util.Map; 033import java.util.Random; 034import java.util.Set; 035import java.util.stream.Stream; 036import org.apache.hadoop.hbase.HBaseClassTestRule; 037import org.apache.hadoop.hbase.HBaseZKTestingUtility; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.testclassification.ReplicationTests; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047@Category({ ReplicationTests.class, MediumTests.class }) 048public class TestZKReplicationPeerStorage { 049 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class); 053 054 private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); 055 056 private static ZKReplicationPeerStorage STORAGE; 057 058 @BeforeClass 059 public static void setUp() throws Exception { 060 UTIL.startMiniZKCluster(); 061 STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); 062 } 063 064 @AfterClass 065 public static void tearDown() throws IOException { 066 UTIL.shutdownMiniZKCluster(); 067 } 068 069 private Set<String> randNamespaces(Random rand) { 070 return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) 071 .collect(toSet()); 072 } 073 074 private Map<TableName, List<String>> randTableCFs(Random rand) { 075 int size = rand.nextInt(5); 076 Map<TableName, List<String>> map = new HashMap<>(); 077 for (int i = 0; i < size; i++) { 078 TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); 079 List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) 080 .limit(rand.nextInt(5)).collect(toList()); 081 map.put(tn, cfs); 082 } 083 return map; 084 } 085 086 private ReplicationPeerConfig getConfig(int seed) { 087 Random rand = new Random(seed); 088 return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong())) 089 .setReplicationEndpointImpl(Long.toHexString(rand.nextLong())) 090 .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand)) 091 .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) 092 .setBandwidth(rand.nextInt(1000)).build(); 093 } 094 095 private void assertSetEquals(Set<String> expected, Set<String> actual) { 096 if (expected == null || expected.size() == 0) { 097 assertTrue(actual == null || actual.size() == 0); 098 return; 099 } 100 assertEquals(expected.size(), actual.size()); 101 expected.forEach(s -> assertTrue(actual.contains(s))); 102 } 103 104 private void assertMapEquals(Map<TableName, List<String>> expected, 105 Map<TableName, List<String>> actual) { 106 if (expected == null || expected.size() == 0) { 107 assertTrue(actual == null || actual.size() == 0); 108 return; 109 } 110 assertEquals(expected.size(), actual.size()); 111 expected.forEach((expectedTn, expectedCFs) -> { 112 List<String> actualCFs = actual.get(expectedTn); 113 if (expectedCFs == null || expectedCFs.size() == 0) { 114 assertTrue(actual.containsKey(expectedTn)); 115 assertTrue(actualCFs == null || actualCFs.size() == 0); 116 } else { 117 assertNotNull(actualCFs); 118 assertEquals(expectedCFs.size(), actualCFs.size()); 119 for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator(); 120 expectedIt.hasNext();) { 121 assertEquals(expectedIt.next(), actualIt.next()); 122 } 123 } 124 }); 125 } 126 127 private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { 128 assertEquals(expected.getClusterKey(), actual.getClusterKey()); 129 assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); 130 assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); 131 assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); 132 assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); 133 assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); 134 assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); 135 assertEquals(expected.getBandwidth(), actual.getBandwidth()); 136 } 137 138 @Test 139 public void test() throws ReplicationException { 140 int peerCount = 10; 141 for (int i = 0; i < peerCount; i++) { 142 STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); 143 } 144 List<String> peerIds = STORAGE.listPeerIds(); 145 assertEquals(peerCount, peerIds.size()); 146 for (String peerId : peerIds) { 147 int seed = Integer.parseInt(peerId); 148 assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); 149 } 150 for (int i = 0; i < peerCount; i++) { 151 STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); 152 } 153 for (String peerId : peerIds) { 154 int seed = Integer.parseInt(peerId); 155 assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); 156 } 157 for (int i = 0; i < peerCount; i++) { 158 assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); 159 } 160 for (int i = 0; i < peerCount; i++) { 161 STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); 162 } 163 for (int i = 0; i < peerCount; i++) { 164 assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); 165 } 166 String toRemove = Integer.toString(peerCount / 2); 167 STORAGE.removePeer(toRemove); 168 peerIds = STORAGE.listPeerIds(); 169 assertEquals(peerCount - 1, peerIds.size()); 170 assertFalse(peerIds.contains(toRemove)); 171 172 try { 173 STORAGE.getPeerConfig(toRemove); 174 fail("Should throw a ReplicationException when get peer config of a peerId"); 175 } catch (ReplicationException e) { 176 } 177 } 178}