001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotNull; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024 025import java.util.concurrent.atomic.AtomicBoolean; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.HBaseZKTestingUtility; 031import org.apache.hadoop.hbase.Stoppable; 032import org.apache.hadoop.hbase.log.HBaseMarkers; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.testclassification.ZKTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.Threads; 037import org.junit.AfterClass; 038import org.junit.BeforeClass; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045@Category({ ZKTests.class, MediumTests.class }) 046public class TestZKLeaderManager { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestZKLeaderManager.class); 051 052 private static final Logger LOG = LoggerFactory.getLogger(TestZKLeaderManager.class); 053 054 private static final String LEADER_ZNODE = 055 "/test/" + TestZKLeaderManager.class.getSimpleName(); 056 057 private static class MockAbortable implements Abortable { 058 private boolean aborted; 059 060 @Override 061 public void abort(String why, Throwable e) { 062 aborted = true; 063 LOG.error(HBaseMarkers.FATAL, "Aborting during test: "+why, e); 064 fail("Aborted during test: " + why); 065 } 066 067 @Override 068 public boolean isAborted() { 069 return aborted; 070 } 071 } 072 073 private static class MockLeader extends Thread implements Stoppable { 074 private volatile boolean stopped; 075 private ZKWatcher watcher; 076 private ZKLeaderManager zkLeader; 077 private AtomicBoolean master = new AtomicBoolean(false); 078 private int index; 079 080 MockLeader(ZKWatcher watcher, int index) { 081 setDaemon(true); 082 setName("TestZKLeaderManager-leader-" + index); 083 this.index = index; 084 this.watcher = watcher; 085 this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE, 086 Bytes.toBytes(index), this); 087 } 088 089 public boolean isMaster() { 090 return master.get(); 091 } 092 093 public int getIndex() { 094 return index; 095 } 096 097 public ZKWatcher getWatcher() { 098 return watcher; 099 } 100 101 @Override 102 public void run() { 103 while (!stopped) { 104 zkLeader.start(); 105 zkLeader.waitToBecomeLeader(); 106 master.set(true); 107 108 while (master.get() && !stopped) { 109 try { 110 Thread.sleep(10); 111 } catch (InterruptedException ignored) {} 112 } 113 } 114 } 115 116 void abdicate() { 117 zkLeader.stepDownAsLeader(); 118 master.set(false); 119 } 120 121 @Override 122 public void stop(String why) { 123 stopped = true; 124 abdicate(); 125 Threads.sleep(100); 126 watcher.close(); 127 } 128 129 @Override 130 public boolean isStopped() { 131 return stopped; 132 } 133 } 134 135 private static HBaseZKTestingUtility TEST_UTIL; 136 private static MockLeader[] CANDIDATES; 137 138 @BeforeClass 139 public static void setupBeforeClass() throws Exception { 140 TEST_UTIL = new HBaseZKTestingUtility(); 141 TEST_UTIL.startMiniZKCluster(); 142 Configuration conf = TEST_UTIL.getConfiguration(); 143 144 // use an abortable to fail the test in the case of any KeeperExceptions 145 MockAbortable abortable = new MockAbortable(); 146 int count = 5; 147 CANDIDATES = new MockLeader[count]; 148 for (int i = 0; i < count; i++) { 149 ZKWatcher watcher = newZK(conf, "server"+i, abortable); 150 CANDIDATES[i] = new MockLeader(watcher, i); 151 CANDIDATES[i].start(); 152 } 153 } 154 155 @AfterClass 156 public static void tearDownAfterClass() throws Exception { 157 TEST_UTIL.shutdownMiniZKCluster(); 158 } 159 160 @Test 161 public void testLeaderSelection() throws Exception { 162 MockLeader currentLeader = getCurrentLeader(); 163 // one leader should have been found 164 assertNotNull("Leader should exist", currentLeader); 165 LOG.debug("Current leader index is "+currentLeader.getIndex()); 166 167 byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 168 assertNotNull("Leader znode should contain leader index", znodeData); 169 assertTrue("Leader znode should not be empty", znodeData.length > 0); 170 int storedIndex = Bytes.toInt(znodeData); 171 LOG.debug("Stored leader index in ZK is "+storedIndex); 172 assertEquals("Leader znode should match leader index", 173 currentLeader.getIndex(), storedIndex); 174 175 // force a leader transition 176 currentLeader.abdicate(); 177 178 // check for new leader 179 currentLeader = getCurrentLeader(); 180 // one leader should have been found 181 assertNotNull("New leader should exist after abdication", currentLeader); 182 LOG.debug("New leader index is "+currentLeader.getIndex()); 183 184 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 185 assertNotNull("Leader znode should contain leader index", znodeData); 186 assertTrue("Leader znode should not be empty", znodeData.length > 0); 187 storedIndex = Bytes.toInt(znodeData); 188 LOG.debug("Stored leader index in ZK is "+storedIndex); 189 assertEquals("Leader znode should match leader index", 190 currentLeader.getIndex(), storedIndex); 191 192 // force another transition by stopping the current 193 currentLeader.stop("Stopping for test"); 194 195 // check for new leader 196 currentLeader = getCurrentLeader(); 197 // one leader should have been found 198 assertNotNull("New leader should exist after stop", currentLeader); 199 LOG.debug("New leader index is "+currentLeader.getIndex()); 200 201 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 202 assertNotNull("Leader znode should contain leader index", znodeData); 203 assertTrue("Leader znode should not be empty", znodeData.length > 0); 204 storedIndex = Bytes.toInt(znodeData); 205 LOG.debug("Stored leader index in ZK is "+storedIndex); 206 assertEquals("Leader znode should match leader index", 207 currentLeader.getIndex(), storedIndex); 208 209 // with a second stop we can guarantee that a previous leader has resumed leading 210 currentLeader.stop("Stopping for test"); 211 212 // check for new 213 currentLeader = getCurrentLeader(); 214 assertNotNull("New leader should exist", currentLeader); 215 } 216 217 private MockLeader getCurrentLeader() { 218 MockLeader currentLeader = null; 219 220 // Wait up to 10 secs for initial leader 221 for (int i = 0; i < 1000; i++) { 222 for (int j = 0; j < CANDIDATES.length; j++) { 223 if (CANDIDATES[j].isMaster()) { 224 // should only be one leader 225 if (currentLeader != null) { 226 fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!"); 227 } 228 currentLeader = CANDIDATES[j]; 229 } 230 } 231 if (currentLeader != null) { 232 break; 233 } 234 Threads.sleep(100); 235 } 236 return currentLeader; 237 } 238 239 private static ZKWatcher newZK(Configuration conf, String name, Abortable abort) 240 throws Exception { 241 Configuration copy = HBaseConfiguration.create(conf); 242 return new ZKWatcher(copy, name, abort); 243 } 244}