001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023import static org.junit.jupiter.api.Assertions.fail;
024
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HBaseZKTestingUtil;
030import org.apache.hadoop.hbase.Stoppable;
031import org.apache.hadoop.hbase.log.HBaseMarkers;
032import org.apache.hadoop.hbase.testclassification.MediumTests;
033import org.apache.hadoop.hbase.testclassification.ZKTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.Threads;
036import org.junit.jupiter.api.AfterAll;
037import org.junit.jupiter.api.BeforeAll;
038import org.junit.jupiter.api.Tag;
039import org.junit.jupiter.api.Test;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Tag(ZKTests.TAG)
044@Tag(MediumTests.TAG)
045public class TestZKLeaderManager {
046
047  private static final Logger LOG = LoggerFactory.getLogger(TestZKLeaderManager.class);
048
049  private static final String LEADER_ZNODE = "/test/" + TestZKLeaderManager.class.getSimpleName();
050
051  private static class MockAbortable implements Abortable {
052    private boolean aborted;
053
054    @Override
055    public void abort(String why, Throwable e) {
056      aborted = true;
057      LOG.error(HBaseMarkers.FATAL, "Aborting during test: " + why, e);
058      fail("Aborted during test: " + why);
059    }
060
061    @Override
062    public boolean isAborted() {
063      return aborted;
064    }
065  }
066
067  private static class MockLeader extends Thread implements Stoppable {
068    private volatile boolean stopped;
069    private ZKWatcher watcher;
070    private ZKLeaderManager zkLeader;
071    private AtomicBoolean master = new AtomicBoolean(false);
072    private int index;
073
074    MockLeader(ZKWatcher watcher, int index) {
075      setDaemon(true);
076      setName("TestZKLeaderManager-leader-" + index);
077      this.index = index;
078      this.watcher = watcher;
079      this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE, Bytes.toBytes(index), this);
080    }
081
082    public boolean isMaster() {
083      return master.get();
084    }
085
086    public int getIndex() {
087      return index;
088    }
089
090    public ZKWatcher getWatcher() {
091      return watcher;
092    }
093
094    @Override
095    public void run() {
096      while (!stopped) {
097        zkLeader.start();
098        zkLeader.waitToBecomeLeader();
099        master.set(true);
100
101        while (master.get() && !stopped) {
102          try {
103            Thread.sleep(10);
104          } catch (InterruptedException ignored) {
105          }
106        }
107      }
108    }
109
110    void abdicate() {
111      zkLeader.stepDownAsLeader();
112      master.set(false);
113    }
114
115    @Override
116    public void stop(String why) {
117      stopped = true;
118      abdicate();
119      Threads.sleep(100);
120      watcher.close();
121    }
122
123    @Override
124    public boolean isStopped() {
125      return stopped;
126    }
127  }
128
129  private static HBaseZKTestingUtil TEST_UTIL;
130  private static MockLeader[] CANDIDATES;
131
132  @BeforeAll
133  public static void setupBeforeClass() throws Exception {
134    TEST_UTIL = new HBaseZKTestingUtil();
135    TEST_UTIL.startMiniZKCluster();
136    Configuration conf = TEST_UTIL.getConfiguration();
137
138    // use an abortable to fail the test in the case of any KeeperExceptions
139    MockAbortable abortable = new MockAbortable();
140    int count = 5;
141    CANDIDATES = new MockLeader[count];
142    for (int i = 0; i < count; i++) {
143      ZKWatcher watcher = newZK(conf, "server" + i, abortable);
144      CANDIDATES[i] = new MockLeader(watcher, i);
145      CANDIDATES[i].start();
146    }
147  }
148
149  @AfterAll
150  public static void tearDownAfterClass() throws Exception {
151    TEST_UTIL.shutdownMiniZKCluster();
152  }
153
154  @Test
155  public void testLeaderSelection() throws Exception {
156    MockLeader currentLeader = getCurrentLeader();
157    // one leader should have been found
158    assertNotNull(currentLeader, "Leader should exist");
159    LOG.debug("Current leader index is " + currentLeader.getIndex());
160
161    byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
162    assertNotNull(znodeData, "Leader znode should contain leader index");
163    assertTrue(znodeData.length > 0, "Leader znode should not be empty");
164    int storedIndex = Bytes.toInt(znodeData);
165    LOG.debug("Stored leader index in ZK is " + storedIndex);
166    assertEquals(currentLeader.getIndex(), storedIndex, "Leader znode should match leader index");
167
168    // force a leader transition
169    currentLeader.abdicate();
170
171    // check for new leader
172    currentLeader = getCurrentLeader();
173    // one leader should have been found
174    assertNotNull(currentLeader, "New leader should exist after abdication");
175    LOG.debug("New leader index is " + currentLeader.getIndex());
176
177    znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
178    assertNotNull(znodeData, "Leader znode should contain leader index");
179    assertTrue(znodeData.length > 0, "Leader znode should not be empty");
180    storedIndex = Bytes.toInt(znodeData);
181    LOG.debug("Stored leader index in ZK is " + storedIndex);
182    assertEquals(currentLeader.getIndex(), storedIndex, "Leader znode should match leader index");
183
184    // force another transition by stopping the current
185    currentLeader.stop("Stopping for test");
186
187    // check for new leader
188    currentLeader = getCurrentLeader();
189    // one leader should have been found
190    assertNotNull(currentLeader, "New leader should exist after stop");
191    LOG.debug("New leader index is " + currentLeader.getIndex());
192
193    znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
194    assertNotNull(znodeData, "Leader znode should contain leader index");
195    assertTrue(znodeData.length > 0, "Leader znode should not be empty");
196    storedIndex = Bytes.toInt(znodeData);
197    LOG.debug("Stored leader index in ZK is " + storedIndex);
198    assertEquals(currentLeader.getIndex(), storedIndex, "Leader znode should match leader index");
199
200    // with a second stop we can guarantee that a previous leader has resumed leading
201    currentLeader.stop("Stopping for test");
202
203    // check for new
204    currentLeader = getCurrentLeader();
205    assertNotNull(currentLeader, "New leader should exist");
206  }
207
208  private MockLeader getCurrentLeader() {
209    MockLeader currentLeader = null;
210
211    // Wait up to 10 secs for initial leader
212    for (int i = 0; i < 1000; i++) {
213      for (int j = 0; j < CANDIDATES.length; j++) {
214        if (CANDIDATES[j].isMaster()) {
215          // should only be one leader
216          if (currentLeader != null) {
217            fail(
218              "Both candidate " + currentLeader.getIndex() + " and " + j + " claim to be leader!");
219          }
220          currentLeader = CANDIDATES[j];
221        }
222      }
223      if (currentLeader != null) {
224        break;
225      }
226      Threads.sleep(100);
227    }
228    return currentLeader;
229  }
230
231  private static ZKWatcher newZK(Configuration conf, String name, Abortable abort)
232    throws Exception {
233    Configuration copy = HBaseConfiguration.create(conf);
234    return new ZKWatcher(copy, name, abort);
235  }
236}