001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.concurrent.atomic.AtomicInteger; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.hbase.ChoreService; 031import org.apache.hadoop.hbase.ClusterId; 032import org.apache.hadoop.hbase.CoordinatedStateManager; 033import org.apache.hadoop.hbase.HBaseClassTestRule; 034import org.apache.hadoop.hbase.HBaseTestingUtility; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.Server; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.client.ClusterConnection; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.testclassification.MediumTests; 041import org.apache.hadoop.hbase.testclassification.ReplicationTests; 042import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; 043import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 044import org.apache.hadoop.hbase.zookeeper.ZKUtil; 045import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 046import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 047import org.junit.AfterClass; 048import org.junit.Before; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Test; 052import org.junit.experimental.categories.Category; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One 058 * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation 059 * of the rsZNode. All other znode creation/initialization is handled by the replication state 060 * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the 061 * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode). 062 */ 063@Category({ReplicationTests.class, MediumTests.class}) 064public class TestReplicationTrackerZKImpl { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class); 069 070 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class); 071 072 private static Configuration conf; 073 private static HBaseTestingUtility utility; 074 075 // Each one of the below variables are reinitialized before every test case 076 private ZKWatcher zkw; 077 private ReplicationPeers rp; 078 private ReplicationTracker rt; 079 private AtomicInteger rsRemovedCount; 080 private String rsRemovedData; 081 private AtomicInteger plChangedCount; 082 private List<String> plChangedData; 083 private AtomicInteger peerRemovedCount; 084 private String peerRemovedData; 085 086 @BeforeClass 087 public static void setUpBeforeClass() throws Exception { 088 utility = new HBaseTestingUtility(); 089 utility.startMiniZKCluster(); 090 conf = utility.getConfiguration(); 091 ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility); 092 ZKUtil.createWithParents(zk, zk.znodePaths.rsZNode); 093 } 094 095 @Before 096 public void setUp() throws Exception { 097 zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); 098 String fakeRs1 = ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234"); 099 try { 100 ZKClusterId.setClusterId(zkw, new ClusterId()); 101 rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); 102 rp.init(); 103 rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1)); 104 } catch (Exception e) { 105 fail("Exception during test setup: " + e); 106 } 107 rsRemovedCount = new AtomicInteger(0); 108 rsRemovedData = ""; 109 plChangedCount = new AtomicInteger(0); 110 plChangedData = new ArrayList<>(); 111 peerRemovedCount = new AtomicInteger(0); 112 peerRemovedData = ""; 113 } 114 115 @AfterClass 116 public static void tearDownAfterClass() throws Exception { 117 utility.shutdownMiniZKCluster(); 118 } 119 120 @Test 121 public void testGetListOfRegionServers() throws Exception { 122 // 0 region servers 123 assertEquals(0, rt.getListOfRegionServers().size()); 124 125 // 1 region server 126 ZKUtil.createWithParents(zkw, 127 ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234")); 128 assertEquals(1, rt.getListOfRegionServers().size()); 129 130 // 2 region servers 131 ZKUtil.createWithParents(zkw, 132 ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname2.example.org:1234")); 133 assertEquals(2, rt.getListOfRegionServers().size()); 134 135 // 1 region server 136 ZKUtil.deleteNode(zkw, 137 ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname2.example.org:1234")); 138 assertEquals(1, rt.getListOfRegionServers().size()); 139 140 // 0 region server 141 ZKUtil.deleteNode(zkw, 142 ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname1.example.org:1234")); 143 assertEquals(0, rt.getListOfRegionServers().size()); 144 } 145 146 @Test 147 public void testRegionServerRemovedEvent() throws Exception { 148 ZKUtil.createAndWatch(zkw, 149 ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname2.example.org:1234"), 150 HConstants.EMPTY_BYTE_ARRAY); 151 rt.registerListener(new DummyReplicationListener()); 152 // delete one 153 ZKUtil.deleteNode(zkw, 154 ZNodePaths.joinZNode(zkw.znodePaths.rsZNode, "hostname2.example.org:1234")); 155 // wait for event 156 while (rsRemovedCount.get() < 1) { 157 Thread.sleep(5); 158 } 159 assertEquals("hostname2.example.org:1234", rsRemovedData); 160 } 161 162 @Test 163 public void testPeerRemovedEvent() throws Exception { 164 rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); 165 rt.registerListener(new DummyReplicationListener()); 166 rp.unregisterPeer("5"); 167 // wait for event 168 while (peerRemovedCount.get() < 1) { 169 Thread.sleep(5); 170 } 171 assertEquals("5", peerRemovedData); 172 } 173 174 @Test 175 public void testPeerListChangedEvent() throws Exception { 176 // add a peer 177 rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); 178 zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true); 179 rt.registerListener(new DummyReplicationListener()); 180 rp.disablePeer("5"); 181 int tmp = plChangedCount.get(); 182 LOG.info("Peer count=" + tmp); 183 ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state"); 184 // wait for event 185 while (plChangedCount.get() <= tmp) { 186 Thread.sleep(100); 187 LOG.info("Peer count=" + tmp); 188 } 189 assertEquals(1, plChangedData.size()); 190 assertTrue(plChangedData.contains("5")); 191 192 // clean up 193 //ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5"); 194 rp.unregisterPeer("5"); 195 } 196 197 @Test 198 public void testPeerNameControl() throws Exception { 199 int exists = 0; 200 int hyphen = 0; 201 rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); 202 203 try{ 204 rp.registerPeer("6", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); 205 }catch(IllegalArgumentException e){ 206 exists++; 207 } 208 209 try{ 210 rp.registerPeer("6-ec2", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); 211 }catch(IllegalArgumentException e){ 212 hyphen++; 213 } 214 assertEquals(1, exists); 215 assertEquals(1, hyphen); 216 217 // clean up 218 rp.unregisterPeer("6"); 219 } 220 221 private class DummyReplicationListener implements ReplicationListener { 222 223 @Override 224 public void regionServerRemoved(String regionServer) { 225 rsRemovedData = regionServer; 226 rsRemovedCount.getAndIncrement(); 227 LOG.debug("Received regionServerRemoved event: " + regionServer); 228 } 229 230 @Override 231 public void peerRemoved(String peerId) { 232 peerRemovedData = peerId; 233 peerRemovedCount.getAndIncrement(); 234 LOG.debug("Received peerDisconnected event: " + peerId); 235 } 236 237 @Override 238 public void peerListChanged(List<String> peerIds) { 239 plChangedData.clear(); 240 plChangedData.addAll(peerIds); 241 int count = plChangedCount.getAndIncrement(); 242 LOG.debug("Received peerListChanged event " + count); 243 } 244 } 245 246 private class DummyServer implements Server { 247 private String serverName; 248 private boolean isAborted = false; 249 private boolean isStopped = false; 250 251 public DummyServer(String serverName) { 252 this.serverName = serverName; 253 } 254 255 @Override 256 public Configuration getConfiguration() { 257 return conf; 258 } 259 260 @Override 261 public ZKWatcher getZooKeeper() { 262 return zkw; 263 } 264 265 @Override 266 public CoordinatedStateManager getCoordinatedStateManager() { 267 return null; 268 } 269 270 @Override 271 public ClusterConnection getConnection() { 272 return null; 273 } 274 275 @Override 276 public MetaTableLocator getMetaTableLocator() { 277 return null; 278 } 279 280 @Override 281 public ServerName getServerName() { 282 return ServerName.valueOf(this.serverName); 283 } 284 285 @Override 286 public void abort(String why, Throwable e) { 287 LOG.info("Aborting " + serverName); 288 this.isAborted = true; 289 } 290 291 @Override 292 public boolean isAborted() { 293 return this.isAborted; 294 } 295 296 @Override 297 public void stop(String why) { 298 this.isStopped = true; 299 } 300 301 @Override 302 public boolean isStopped() { 303 return this.isStopped; 304 } 305 306 @Override 307 public ChoreService getChoreService() { 308 return null; 309 } 310 311 @Override 312 public ClusterConnection getClusterConnection() { 313 // TODO Auto-generated method stub 314 return null; 315 } 316 317 @Override 318 public FileSystem getFileSystem() { 319 return null; 320 } 321 322 @Override 323 public boolean isStopping() { 324 return false; 325 } 326 327 @Override 328 public Connection createConnection(Configuration conf) throws IOException { 329 return null; 330 } 331 } 332}