001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseZKTestingUtility;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.log.HBaseMarkers;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.testclassification.ZKTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.Threads;
037import org.junit.AfterClass;
038import org.junit.BeforeClass;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045@Category({ ZKTests.class, MediumTests.class })
046public class TestZKLeaderManager {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050    HBaseClassTestRule.forClass(TestZKLeaderManager.class);
051
052  private static final Logger LOG = LoggerFactory.getLogger(TestZKLeaderManager.class);
053
054  private static final String LEADER_ZNODE = "/test/" + TestZKLeaderManager.class.getSimpleName();
055
056  private static class MockAbortable implements Abortable {
057    private boolean aborted;
058
059    @Override
060    public void abort(String why, Throwable e) {
061      aborted = true;
062      LOG.error(HBaseMarkers.FATAL, "Aborting during test: " + why, e);
063      fail("Aborted during test: " + why);
064    }
065
066    @Override
067    public boolean isAborted() {
068      return aborted;
069    }
070  }
071
072  private static class MockLeader extends Thread implements Stoppable {
073    private volatile boolean stopped;
074    private ZKWatcher watcher;
075    private ZKLeaderManager zkLeader;
076    private AtomicBoolean master = new AtomicBoolean(false);
077    private int index;
078
079    MockLeader(ZKWatcher watcher, int index) {
080      setDaemon(true);
081      setName("TestZKLeaderManager-leader-" + index);
082      this.index = index;
083      this.watcher = watcher;
084      this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE, Bytes.toBytes(index), this);
085    }
086
087    public boolean isMaster() {
088      return master.get();
089    }
090
091    public int getIndex() {
092      return index;
093    }
094
095    public ZKWatcher getWatcher() {
096      return watcher;
097    }
098
099    @Override
100    public void run() {
101      while (!stopped) {
102        zkLeader.start();
103        zkLeader.waitToBecomeLeader();
104        master.set(true);
105
106        while (master.get() && !stopped) {
107          try {
108            Thread.sleep(10);
109          } catch (InterruptedException ignored) {
110          }
111        }
112      }
113    }
114
115    void abdicate() {
116      zkLeader.stepDownAsLeader();
117      master.set(false);
118    }
119
120    @Override
121    public void stop(String why) {
122      stopped = true;
123      abdicate();
124      Threads.sleep(100);
125      watcher.close();
126    }
127
128    @Override
129    public boolean isStopped() {
130      return stopped;
131    }
132  }
133
134  private static HBaseZKTestingUtility TEST_UTIL;
135  private static MockLeader[] CANDIDATES;
136
137  @BeforeClass
138  public static void setupBeforeClass() throws Exception {
139    TEST_UTIL = new HBaseZKTestingUtility();
140    TEST_UTIL.startMiniZKCluster();
141    Configuration conf = TEST_UTIL.getConfiguration();
142
143    // use an abortable to fail the test in the case of any KeeperExceptions
144    MockAbortable abortable = new MockAbortable();
145    int count = 5;
146    CANDIDATES = new MockLeader[count];
147    for (int i = 0; i < count; i++) {
148      ZKWatcher watcher = newZK(conf, "server" + i, abortable);
149      CANDIDATES[i] = new MockLeader(watcher, i);
150      CANDIDATES[i].start();
151    }
152  }
153
154  @AfterClass
155  public static void tearDownAfterClass() throws Exception {
156    TEST_UTIL.shutdownMiniZKCluster();
157  }
158
159  @Test
160  public void testLeaderSelection() throws Exception {
161    MockLeader currentLeader = getCurrentLeader();
162    // one leader should have been found
163    assertNotNull("Leader should exist", currentLeader);
164    LOG.debug("Current leader index is " + currentLeader.getIndex());
165
166    byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
167    assertNotNull("Leader znode should contain leader index", znodeData);
168    assertTrue("Leader znode should not be empty", znodeData.length > 0);
169    int storedIndex = Bytes.toInt(znodeData);
170    LOG.debug("Stored leader index in ZK is " + storedIndex);
171    assertEquals("Leader znode should match leader index", currentLeader.getIndex(), storedIndex);
172
173    // force a leader transition
174    currentLeader.abdicate();
175
176    // check for new leader
177    currentLeader = getCurrentLeader();
178    // one leader should have been found
179    assertNotNull("New leader should exist after abdication", currentLeader);
180    LOG.debug("New leader index is " + currentLeader.getIndex());
181
182    znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
183    assertNotNull("Leader znode should contain leader index", znodeData);
184    assertTrue("Leader znode should not be empty", znodeData.length > 0);
185    storedIndex = Bytes.toInt(znodeData);
186    LOG.debug("Stored leader index in ZK is " + storedIndex);
187    assertEquals("Leader znode should match leader index", currentLeader.getIndex(), storedIndex);
188
189    // force another transition by stopping the current
190    currentLeader.stop("Stopping for test");
191
192    // check for new leader
193    currentLeader = getCurrentLeader();
194    // one leader should have been found
195    assertNotNull("New leader should exist after stop", currentLeader);
196    LOG.debug("New leader index is " + currentLeader.getIndex());
197
198    znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
199    assertNotNull("Leader znode should contain leader index", znodeData);
200    assertTrue("Leader znode should not be empty", znodeData.length > 0);
201    storedIndex = Bytes.toInt(znodeData);
202    LOG.debug("Stored leader index in ZK is " + storedIndex);
203    assertEquals("Leader znode should match leader index", currentLeader.getIndex(), storedIndex);
204
205    // with a second stop we can guarantee that a previous leader has resumed leading
206    currentLeader.stop("Stopping for test");
207
208    // check for new
209    currentLeader = getCurrentLeader();
210    assertNotNull("New leader should exist", currentLeader);
211  }
212
213  private MockLeader getCurrentLeader() {
214    MockLeader currentLeader = null;
215
216    // Wait up to 10 secs for initial leader
217    for (int i = 0; i < 1000; i++) {
218      for (int j = 0; j < CANDIDATES.length; j++) {
219        if (CANDIDATES[j].isMaster()) {
220          // should only be one leader
221          if (currentLeader != null) {
222            fail(
223              "Both candidate " + currentLeader.getIndex() + " and " + j + " claim to be leader!");
224          }
225          currentLeader = CANDIDATES[j];
226        }
227      }
228      if (currentLeader != null) {
229        break;
230      }
231      Threads.sleep(100);
232    }
233    return currentLeader;
234  }
235
236  private static ZKWatcher newZK(Configuration conf, String name, Abortable abort)
237    throws Exception {
238    Configuration copy = HBaseConfiguration.create(conf);
239    return new ZKWatcher(copy, name, abort);
240  }
241}