001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import java.util.concurrent.atomic.AtomicBoolean; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HBaseZKTestingUtil; 030import org.apache.hadoop.hbase.Stoppable; 031import org.apache.hadoop.hbase.log.HBaseMarkers; 032import org.apache.hadoop.hbase.testclassification.MediumTests; 033import org.apache.hadoop.hbase.testclassification.ZKTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.Threads; 036import org.junit.jupiter.api.AfterAll; 037import org.junit.jupiter.api.BeforeAll; 038import org.junit.jupiter.api.Tag; 039import org.junit.jupiter.api.Test; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043@Tag(ZKTests.TAG) 044@Tag(MediumTests.TAG) 045public class TestZKLeaderManager { 046 047 private static final Logger LOG = LoggerFactory.getLogger(TestZKLeaderManager.class); 048 049 private static final String LEADER_ZNODE = "/test/" + TestZKLeaderManager.class.getSimpleName(); 050 051 private static class MockAbortable implements Abortable { 052 private boolean aborted; 053 054 @Override 055 public void abort(String why, Throwable e) { 056 aborted = true; 057 LOG.error(HBaseMarkers.FATAL, "Aborting during test: " + why, e); 058 fail("Aborted during test: " + why); 059 } 060 061 @Override 062 public boolean isAborted() { 063 return aborted; 064 } 065 } 066 067 private static class MockLeader extends Thread implements Stoppable { 068 private volatile boolean stopped; 069 private ZKWatcher watcher; 070 private ZKLeaderManager zkLeader; 071 private AtomicBoolean master = new AtomicBoolean(false); 072 private int index; 073 074 MockLeader(ZKWatcher watcher, int index) { 075 setDaemon(true); 076 setName("TestZKLeaderManager-leader-" + index); 077 this.index = index; 078 this.watcher = watcher; 079 this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE, Bytes.toBytes(index), this); 080 } 081 082 public boolean isMaster() { 083 return master.get(); 084 } 085 086 public int getIndex() { 087 return index; 088 } 089 090 public ZKWatcher getWatcher() { 091 return watcher; 092 } 093 094 @Override 095 public void run() { 096 while (!stopped) { 097 zkLeader.start(); 098 zkLeader.waitToBecomeLeader(); 099 master.set(true); 100 101 while (master.get() && !stopped) { 102 try { 103 Thread.sleep(10); 104 } catch (InterruptedException ignored) { 105 } 106 } 107 } 108 } 109 110 void abdicate() { 111 zkLeader.stepDownAsLeader(); 112 master.set(false); 113 } 114 115 @Override 116 public void stop(String why) { 117 stopped = true; 118 abdicate(); 119 Threads.sleep(100); 120 watcher.close(); 121 } 122 123 @Override 124 public boolean isStopped() { 125 return stopped; 126 } 127 } 128 129 private static HBaseZKTestingUtil TEST_UTIL; 130 private static MockLeader[] CANDIDATES; 131 132 @BeforeAll 133 public static void setupBeforeClass() throws Exception { 134 TEST_UTIL = new HBaseZKTestingUtil(); 135 TEST_UTIL.startMiniZKCluster(); 136 Configuration conf = TEST_UTIL.getConfiguration(); 137 138 // use an abortable to fail the test in the case of any KeeperExceptions 139 MockAbortable abortable = new MockAbortable(); 140 int count = 5; 141 CANDIDATES = new MockLeader[count]; 142 for (int i = 0; i < count; i++) { 143 ZKWatcher watcher = newZK(conf, "server" + i, abortable); 144 CANDIDATES[i] = new MockLeader(watcher, i); 145 CANDIDATES[i].start(); 146 } 147 } 148 149 @AfterAll 150 public static void tearDownAfterClass() throws Exception { 151 TEST_UTIL.shutdownMiniZKCluster(); 152 } 153 154 @Test 155 public void testLeaderSelection() throws Exception { 156 MockLeader currentLeader = getCurrentLeader(); 157 // one leader should have been found 158 assertNotNull(currentLeader, "Leader should exist"); 159 LOG.debug("Current leader index is " + currentLeader.getIndex()); 160 161 byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 162 assertNotNull(znodeData, "Leader znode should contain leader index"); 163 assertTrue(znodeData.length > 0, "Leader znode should not be empty"); 164 int storedIndex = Bytes.toInt(znodeData); 165 LOG.debug("Stored leader index in ZK is " + storedIndex); 166 assertEquals(currentLeader.getIndex(), storedIndex, "Leader znode should match leader index"); 167 168 // force a leader transition 169 currentLeader.abdicate(); 170 171 // check for new leader 172 currentLeader = getCurrentLeader(); 173 // one leader should have been found 174 assertNotNull(currentLeader, "New leader should exist after abdication"); 175 LOG.debug("New leader index is " + currentLeader.getIndex()); 176 177 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 178 assertNotNull(znodeData, "Leader znode should contain leader index"); 179 assertTrue(znodeData.length > 0, "Leader znode should not be empty"); 180 storedIndex = Bytes.toInt(znodeData); 181 LOG.debug("Stored leader index in ZK is " + storedIndex); 182 assertEquals(currentLeader.getIndex(), storedIndex, "Leader znode should match leader index"); 183 184 // force another transition by stopping the current 185 currentLeader.stop("Stopping for test"); 186 187 // check for new leader 188 currentLeader = getCurrentLeader(); 189 // one leader should have been found 190 assertNotNull(currentLeader, "New leader should exist after stop"); 191 LOG.debug("New leader index is " + currentLeader.getIndex()); 192 193 znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); 194 assertNotNull(znodeData, "Leader znode should contain leader index"); 195 assertTrue(znodeData.length > 0, "Leader znode should not be empty"); 196 storedIndex = Bytes.toInt(znodeData); 197 LOG.debug("Stored leader index in ZK is " + storedIndex); 198 assertEquals(currentLeader.getIndex(), storedIndex, "Leader znode should match leader index"); 199 200 // with a second stop we can guarantee that a previous leader has resumed leading 201 currentLeader.stop("Stopping for test"); 202 203 // check for new 204 currentLeader = getCurrentLeader(); 205 assertNotNull(currentLeader, "New leader should exist"); 206 } 207 208 private MockLeader getCurrentLeader() { 209 MockLeader currentLeader = null; 210 211 // Wait up to 10 secs for initial leader 212 for (int i = 0; i < 1000; i++) { 213 for (int j = 0; j < CANDIDATES.length; j++) { 214 if (CANDIDATES[j].isMaster()) { 215 // should only be one leader 216 if (currentLeader != null) { 217 fail( 218 "Both candidate " + currentLeader.getIndex() + " and " + j + " claim to be leader!"); 219 } 220 currentLeader = CANDIDATES[j]; 221 } 222 } 223 if (currentLeader != null) { 224 break; 225 } 226 Threads.sleep(100); 227 } 228 return currentLeader; 229 } 230 231 private static ZKWatcher newZK(Configuration conf, String name, Abortable abort) 232 throws Exception { 233 Configuration copy = HBaseConfiguration.create(conf); 234 return new ZKWatcher(copy, name, abort); 235 } 236}