001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.fail; 022 023import java.io.IOException; 024import java.util.concurrent.atomic.AtomicInteger; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.hbase.ChoreService; 028import org.apache.hadoop.hbase.ClusterId; 029import org.apache.hadoop.hbase.CoordinatedStateManager; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.Server; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.client.ClusterConnection; 036import org.apache.hadoop.hbase.client.Connection; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.zookeeper.ZKClusterId; 040import org.apache.hadoop.hbase.zookeeper.ZKUtil; 041import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 042import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 043import org.apache.zookeeper.KeeperException; 044import org.junit.AfterClass; 045import org.junit.Before; 046import org.junit.BeforeClass; 047import org.junit.ClassRule; 048import org.junit.Test; 049import org.junit.experimental.categories.Category; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One 055 * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation 056 * of the rsZNode. All other znode creation/initialization is handled by the replication state 057 * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the 058 * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode). 059 */ 060@Category({ReplicationTests.class, MediumTests.class}) 061public class TestReplicationTrackerZKImpl { 062 063 @ClassRule 064 public static final HBaseClassTestRule CLASS_RULE = 065 HBaseClassTestRule.forClass(TestReplicationTrackerZKImpl.class); 066 067 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationTrackerZKImpl.class); 068 069 private static Configuration conf; 070 private static HBaseTestingUtility utility; 071 072 // Each one of the below variables are reinitialized before every test case 073 private ZKWatcher zkw; 074 private ReplicationPeers rp; 075 private ReplicationTracker rt; 076 private AtomicInteger rsRemovedCount; 077 private String rsRemovedData; 078 079 @BeforeClass 080 public static void setUpBeforeClass() throws Exception { 081 utility = new HBaseTestingUtility(); 082 utility.startMiniZKCluster(); 083 conf = utility.getConfiguration(); 084 ZKWatcher zk = HBaseTestingUtility.getZooKeeperWatcher(utility); 085 ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode); 086 } 087 088 @Before 089 public void setUp() throws Exception { 090 zkw = HBaseTestingUtility.getZooKeeperWatcher(utility); 091 String fakeRs1 = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, 092 "hostname1.example.org:1234"); 093 try { 094 ZKClusterId.setClusterId(zkw, new ClusterId()); 095 rp = ReplicationFactory.getReplicationPeers(zkw, conf); 096 rp.init(); 097 rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1), 098 new DummyServer(fakeRs1)); 099 } catch (Exception e) { 100 fail("Exception during test setup: " + e); 101 } 102 rsRemovedCount = new AtomicInteger(0); 103 rsRemovedData = ""; 104 } 105 106 @AfterClass 107 public static void tearDownAfterClass() throws Exception { 108 utility.shutdownMiniZKCluster(); 109 } 110 111 @Test 112 public void testGetListOfRegionServers() throws Exception { 113 // 0 region servers 114 assertEquals(0, rt.getListOfRegionServers().size()); 115 116 // 1 region server 117 ZKUtil.createWithParents(zkw, 118 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234")); 119 assertEquals(1, rt.getListOfRegionServers().size()); 120 121 // 2 region servers 122 ZKUtil.createWithParents(zkw, 123 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 124 assertEquals(2, rt.getListOfRegionServers().size()); 125 126 // 1 region server 127 ZKUtil.deleteNode(zkw, 128 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 129 assertEquals(1, rt.getListOfRegionServers().size()); 130 131 // 0 region server 132 ZKUtil.deleteNode(zkw, 133 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname1.example.org:1234")); 134 assertEquals(0, rt.getListOfRegionServers().size()); 135 } 136 137 @Test 138 public void testRegionServerRemovedEvent() throws Exception { 139 ZKUtil.createAndWatch(zkw, 140 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234"), 141 HConstants.EMPTY_BYTE_ARRAY); 142 rt.registerListener(new DummyReplicationListener()); 143 // delete one 144 ZKUtil.deleteNode(zkw, 145 ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, "hostname2.example.org:1234")); 146 // wait for event 147 while (rsRemovedCount.get() < 1) { 148 Thread.sleep(5); 149 } 150 assertEquals("hostname2.example.org:1234", rsRemovedData); 151 } 152 153 @Test 154 public void testPeerNameControl() throws Exception { 155 int exists = 0; 156 rp.getPeerStorage().addPeer("6", 157 ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); 158 159 try { 160 rp.getPeerStorage().addPeer("6", 161 ReplicationPeerConfig.newBuilder().setClusterKey(utility.getClusterKey()).build(), true); 162 } catch (ReplicationException e) { 163 if (e.getCause() instanceof KeeperException.NodeExistsException) { 164 exists++; 165 } 166 } 167 168 assertEquals(1, exists); 169 170 // clean up 171 rp.getPeerStorage().removePeer("6"); 172 } 173 174 private class DummyReplicationListener implements ReplicationListener { 175 176 @Override 177 public void regionServerRemoved(String regionServer) { 178 rsRemovedData = regionServer; 179 rsRemovedCount.getAndIncrement(); 180 LOG.debug("Received regionServerRemoved event: " + regionServer); 181 } 182 } 183 184 private class DummyServer implements Server { 185 private String serverName; 186 private boolean isAborted = false; 187 private boolean isStopped = false; 188 189 public DummyServer(String serverName) { 190 this.serverName = serverName; 191 } 192 193 @Override 194 public Configuration getConfiguration() { 195 return conf; 196 } 197 198 @Override 199 public ZKWatcher getZooKeeper() { 200 return zkw; 201 } 202 203 @Override 204 public CoordinatedStateManager getCoordinatedStateManager() { 205 return null; 206 } 207 208 @Override 209 public ClusterConnection getConnection() { 210 return null; 211 } 212 213 @Override 214 public ServerName getServerName() { 215 return ServerName.valueOf(this.serverName); 216 } 217 218 @Override 219 public void abort(String why, Throwable e) { 220 LOG.info("Aborting " + serverName); 221 this.isAborted = true; 222 } 223 224 @Override 225 public boolean isAborted() { 226 return this.isAborted; 227 } 228 229 @Override 230 public void stop(String why) { 231 this.isStopped = true; 232 } 233 234 @Override 235 public boolean isStopped() { 236 return this.isStopped; 237 } 238 239 @Override 240 public ChoreService getChoreService() { 241 return null; 242 } 243 244 @Override 245 public ClusterConnection getClusterConnection() { 246 // TODO Auto-generated method stub 247 return null; 248 } 249 250 @Override 251 public FileSystem getFileSystem() { 252 return null; 253 } 254 255 @Override 256 public boolean isStopping() { 257 return false; 258 } 259 260 @Override 261 public Connection createConnection(Configuration conf) throws IOException { 262 return null; 263 } 264 } 265}