001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.List; 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.hbase.ChoreService; 029import org.apache.hadoop.hbase.ClusterId; 030import org.apache.hadoop.hbase.CoordinatedStateManager; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.Server; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.ClusterConnection; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.testclassification.ReplicationTests; 040import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 041import org.apache.hadoop.hbase.zookeeper.ZKUtil; 042import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 043import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 044import org.apache.zookeeper.KeeperException; 045import org.junit.AfterClass; 046import org.junit.Before; 047import org.junit.BeforeClass; 048import org.junit.ClassRule; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One 056 * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation 057 * of the rsZNode. All other znode creation/initialization is handled by the replication state 058 * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the 059 * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode). 060 */ 061@Category({ReplicationTests.class, MediumTests.class}) 062public class TestReplicationTrackerZKImpl { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class); 067 068 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class); 069 070 private static Configuration conf; 071 private static HBaseTestingUtility utility; 072 073 // Each one of the below variables are reinitialized before every test case 074 private ZKWatcher zkw; 075 private ReplicationPeers rp; 076 private ReplicationTracker rt; 077 private AtomicInteger rsRemovedCount; 078 private String rsRemovedData; 079 080 @BeforeClass 081 public static void setUpBeforeClass() throws Exception { 082 utility = new HBaseTestingUtility(); 083 utility.startMiniZKCluster(); 084 conf = utility.getConfiguration(); 085 ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility); 086 ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode); 087 } 088 089 @Before 090 public void setUp() throws Exception { 091 zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); 092 String fakeRs1 = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, 093 "hostname1.example.org:1234"); 094 try { 095 ZKClusterId.setClusterId(zkw, new ClusterId()); 096 rp = ReplicationFactory.getReplicationPeers(zkw, conf); 097 rp.init(); 098 rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1), 099 new DummyServer(fakeRs1)); 100 } catch (Exception e) { 101 fail("Exception during test setup: " + e); 102 } 103 rsRemovedCount = new AtomicInteger(0); 104 rsRemovedData = ""; 105 } 106 107 @AfterClass 108 public static void tearDownAfterClass() throws Exception { 109 utility.shutdownMiniZKCluster(); 110 } 111 112 @Test 113 public void testGetListOfRegionServers() throws Exception { 114 // 0 region servers 115 assertEquals(0, rt.getListOfRegionServers().size()); 116 117 // 1 region server 118 ZKUtil.createWithParents(zkw, 119 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234")); 120 List<String> rss = rt.getListOfRegionServers(); 121 assertEquals(rss.toString(), 1, rss.size()); 122 123 // 2 region servers 124 ZKUtil.createWithParents(zkw, 125 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 126 rss = rt.getListOfRegionServers(); 127 assertEquals(rss.toString(), 2, rss.size()); 128 129 // 1 region server 130 ZKUtil.deleteNode(zkw, 131 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 132 rss = rt.getListOfRegionServers(); 133 assertEquals(1, rss.size()); 134 135 // 0 region server 136 ZKUtil.deleteNode(zkw, 137 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234")); 138 rss = rt.getListOfRegionServers(); 139 assertEquals(rss.toString(), 0, rss.size()); 140 } 141 142 @Test 143 public void testRegionServerRemovedEvent() throws Exception { 144 ZKUtil.createAndWatch(zkw, 145 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"), 146 HConstants.EMPTY_BYTE_ARRAY); 147 rt.registerListener(new DummyReplicationListener()); 148 // delete one 149 ZKUtil.deleteNode(zkw, 150 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 151 // wait for event 152 while (rsRemovedCount.get() < 1) { 153 Thread.sleep(5); 154 } 155 assertEquals("hostname2.example.org:1234", rsRemovedData); 156 } 157 158 @Test 159 public void testPeerNameControl() throws Exception { 160 int exists = 0; 161 rp.getPeerStorage().addPeer("6", 162 ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); 163 164 try { 165 rp.getPeerStorage().addPeer("6", 166 ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); 167 } catch (ReplicationException e) { 168 if (e.getCause() instanceof KeeperException.NodeExistsException) { 169 exists++; 170 } 171 } 172 173 assertEquals(1, exists); 174 175 // clean up 176 rp.getPeerStorage().removePeer("6"); 177 } 178 179 private class DummyReplicationListener implements ReplicationListener { 180 181 @Override 182 public void regionServerRemoved(String regionServer) { 183 rsRemovedData = regionServer; 184 rsRemovedCount.getAndIncrement(); 185 LOG.debug("Received regionServerRemoved event: " + regionServer); 186 } 187 } 188 189 private class DummyServer implements Server { 190 private String serverName; 191 private boolean isAborted = false; 192 private boolean isStopped = false; 193 194 public DummyServer(String serverName) { 195 this.serverName = serverName; 196 } 197 198 @Override 199 public Configuration getConfiguration() { 200 return conf; 201 } 202 203 @Override 204 public ZKWatcher getZooKeeper() { 205 return zkw; 206 } 207 208 @Override 209 public CoordinatedStateManager getCoordinatedStateManager() { 210 return null; 211 } 212 213 @Override 214 public ClusterConnection getConnection() { 215 return null; 216 } 217 218 @Override 219 public ServerName getServerName() { 220 return ServerName.valueOf(this.serverName); 221 } 222 223 @Override 224 public void abort(String why, Throwable e) { 225 LOG.info("Aborting " + serverName); 226 this.isAborted = true; 227 } 228 229 @Override 230 public boolean isAborted() { 231 return this.isAborted; 232 } 233 234 @Override 235 public void stop(String why) { 236 this.isStopped = true; 237 } 238 239 @Override 240 public boolean isStopped() { 241 return this.isStopped; 242 } 243 244 @Override 245 public ChoreService getChoreService() { 246 return null; 247 } 248 249 @Override 250 public ClusterConnection getClusterConnection() { 251 // TODO Auto-generated method stub 252 return null; 253 } 254 255 @Override 256 public FileSystem getFileSystem() { 257 return null; 258 } 259 260 @Override 261 public boolean isStopping() { 262 return false; 263 } 264 265 @Override 266 public Connection createConnection(Configuration conf) throws IOException { 267 return null; 268 } 269 } 270}