001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.concurrent.atomic.AtomicInteger; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.hbase.ChoreService; 028import org.apache.hadoop.hbase.ClusterId; 029import org.apache.hadoop.hbase.CoordinatedStateManager; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.Server; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.client.ClusterConnection; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 040import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 041import org.apache.hadoop.hbase.zookeeper.ZKUtil; 042import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 043import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 044import org.apache.zookeeper.KeeperException; 045import org.junit.AfterClass; 046import org.junit.Before; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One 056 * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation 057 * of the rsZNode. All other znode creation/initialization is handled by the replication state 058 * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the 059 * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode). 060 */ 061@Category({ReplicationTests.class, MediumTests.class}) 062public class TestReplicationTrackerZKImpl { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class); 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class); 069 070 private static Configuration conf; 071 private static HBaseTestingUtility utility; 072 073 // Each one of the below variables are reinitialized before every test case 074 private ZKWatcher zkw; 075 private ReplicationPeers rp; 076 private ReplicationTracker rt; 077 private AtomicInteger rsRemovedCount; 078 private String rsRemovedData; 079 080 @BeforeClass 081 public static void setUpBeforeClass() throws Exception { 082 utility = new HBaseTestingUtility(); 083 utility.startMiniZKCluster(); 084 conf = utility.getConfiguration(); 085 ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility); 086 ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode); 087 } 088 089 @Before 090 public void setUp() throws Exception { 091 zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); 092 String fakeRs1 = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, 093 "hostname1.example.org:1234"); 094 try { 095 ZKClusterId.setClusterId(zkw, new ClusterId()); 096 rp = ReplicationFactory.getReplicationPeers(zkw, conf); 097 rp.init(); 098 rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1), 099 new DummyServer(fakeRs1)); 100 } catch (Exception e) { 101 fail("Exception during test setup: " + e); 102 } 103 rsRemovedCount = new AtomicInteger(0); 104 rsRemovedData = ""; 105 } 106 107 @AfterClass 108 public static void tearDownAfterClass() throws Exception { 109 utility.shutdownMiniZKCluster(); 110 } 111 112 @Test 113 public void testGetListOfRegionServers() throws Exception { 114 // 0 region servers 115 assertEquals(0, rt.getListOfRegionServers().size()); 116 117 // 1 region server 118 ZKUtil.createWithParents(zkw, 119 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234")); 120 assertEquals(1, rt.getListOfRegionServers().size()); 121 122 // 2 region servers 123 ZKUtil.createWithParents(zkw, 124 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 125 assertEquals(2, rt.getListOfRegionServers().size()); 126 127 // 1 region server 128 ZKUtil.deleteNode(zkw, 129 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 130 assertEquals(1, rt.getListOfRegionServers().size()); 131 132 // 0 region server 133 ZKUtil.deleteNode(zkw, 134 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234")); 135 assertEquals(0, rt.getListOfRegionServers().size()); 136 } 137 138 @Test 139 public void testRegionServerRemovedEvent() throws Exception { 140 ZKUtil.createAndWatch(zkw, 141 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"), 142 HConstants.EMPTY_BYTE_ARRAY); 143 rt.registerListener(new DummyReplicationListener()); 144 // delete one 145 ZKUtil.deleteNode(zkw, 146 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 147 // wait for event 148 while (rsRemovedCount.get() < 1) { 149 Thread.sleep(5); 150 } 151 assertEquals("hostname2.example.org:1234", rsRemovedData); 152 } 153 154 @Test 155 public void testPeerNameControl() throws Exception { 156 int exists = 0; 157 rp.getPeerStorage().addPeer("6", 158 ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); 159 160 try { 161 rp.getPeerStorage().addPeer("6", 162 ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); 163 } catch (ReplicationException e) { 164 if (e.getCause() instanceof KeeperException.NodeExistsException) { 165 exists++; 166 } 167 } 168 169 assertEquals(1, exists); 170 171 // clean up 172 rp.getPeerStorage().removePeer("6"); 173 } 174 175 private class DummyReplicationListener implements ReplicationListener { 176 177 @Override 178 public void regionServerRemoved(String regionServer) { 179 rsRemovedData = regionServer; 180 rsRemovedCount.getAndIncrement(); 181 LOG.debug("Received regionServerRemoved event: " + regionServer); 182 } 183 } 184 185 private class DummyServer implements Server { 186 private String serverName; 187 private boolean isAborted = false; 188 private boolean isStopped = false; 189 190 public DummyServer(String serverName) { 191 this.serverName = serverName; 192 } 193 194 @Override 195 public Configuration getConfiguration() { 196 return conf; 197 } 198 199 @Override 200 public ZKWatcher getZooKeeper() { 201 return zkw; 202 } 203 204 @Override 205 public CoordinatedStateManager getCoordinatedStateManager() { 206 return null; 207 } 208 209 @Override 210 public ClusterConnection getConnection() { 211 return null; 212 } 213 214 @Override 215 public MetaTableLocator getMetaTableLocator() { 216 return null; 217 } 218 219 @Override 220 public ServerName getServerName() { 221 return ServerName.valueOf(this.serverName); 222 } 223 224 @Override 225 public void abort(String why, Throwable e) { 226 LOG.info("Aborting " + serverName); 227 this.isAborted = true; 228 } 229 230 @Override 231 public boolean isAborted() { 232 return this.isAborted; 233 } 234 235 @Override 236 public void stop(String why) { 237 this.isStopped = true; 238 } 239 240 @Override 241 public boolean isStopped() { 242 return this.isStopped; 243 } 244 245 @Override 246 public ChoreService getChoreService() { 247 return null; 248 } 249 250 @Override 251 public ClusterConnection getClusterConnection() { 252 // TODO Auto-generated method stub 253 return null; 254 } 255 256 @Override 257 public FileSystem getFileSystem() { 258 return null; 259 } 260 261 @Override 262 public boolean isStopping() { 263 return false; 264 } 265 266 @Override 267 public Connection createConnection(Configuration conf) throws IOException { 268 return null; 269 } 270 } 271}