001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Abortable;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.HBaseZKTestingUtility;
031import org.apache.hadoop.hbase.Stoppable;
032import org.apache.hadoop.hbase.log.HBaseMarkers;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.testclassification.ZKTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.Threads;
037import org.junit.AfterClass;
038import org.junit.BeforeClass;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045@Category({ ZKTests.class, MediumTests.class })
046public class TestZKLeaderManager {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestZKLeaderManager.class);
051
052  private static final Logger LOG = LoggerFactory.getLogger(TestZKLeaderManager.class);
053
054  private static final String LEADER_ZNODE =
055      "/test/" + TestZKLeaderManager.class.getSimpleName();
056
057  private static class MockAbortable implements Abortable {
058    private boolean aborted;
059
060    @Override
061    public void abort(String why, Throwable e) {
062      aborted = true;
063      LOG.error(HBaseMarkers.FATAL, "Aborting during test: "+why, e);
064      fail("Aborted during test: " + why);
065    }
066
067    @Override
068    public boolean isAborted() {
069      return aborted;
070    }
071  }
072
073  private static class MockLeader extends Thread implements Stoppable {
074    private volatile boolean stopped;
075    private ZKWatcher watcher;
076    private ZKLeaderManager zkLeader;
077    private AtomicBoolean master = new AtomicBoolean(false);
078    private int index;
079
080    MockLeader(ZKWatcher watcher, int index) {
081      setDaemon(true);
082      setName("TestZKLeaderManager-leader-" + index);
083      this.index = index;
084      this.watcher = watcher;
085      this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE,
086          Bytes.toBytes(index), this);
087    }
088
089    public boolean isMaster() {
090      return master.get();
091    }
092
093    public int getIndex() {
094      return index;
095    }
096
097    public ZKWatcher getWatcher() {
098      return watcher;
099    }
100
101    @Override
102    public void run() {
103      while (!stopped) {
104        zkLeader.start();
105        zkLeader.waitToBecomeLeader();
106        master.set(true);
107
108        while (master.get() && !stopped) {
109          try {
110            Thread.sleep(10);
111          } catch (InterruptedException ignored) {}
112        }
113      }
114    }
115
116    void abdicate() {
117      zkLeader.stepDownAsLeader();
118      master.set(false);
119    }
120
121    @Override
122    public void stop(String why) {
123      stopped = true;
124      abdicate();
125      Threads.sleep(100);
126      watcher.close();
127    }
128
129    @Override
130    public boolean isStopped() {
131      return stopped;
132    }
133  }
134
135  private static HBaseZKTestingUtility TEST_UTIL;
136  private static MockLeader[] CANDIDATES;
137
138  @BeforeClass
139  public static void setupBeforeClass() throws Exception {
140    TEST_UTIL = new HBaseZKTestingUtility();
141    TEST_UTIL.startMiniZKCluster();
142    Configuration conf = TEST_UTIL.getConfiguration();
143
144    // use an abortable to fail the test in the case of any KeeperExceptions
145    MockAbortable abortable = new MockAbortable();
146    int count = 5;
147    CANDIDATES = new MockLeader[count];
148    for (int i = 0; i < count; i++) {
149      ZKWatcher watcher = newZK(conf, "server"+i, abortable);
150      CANDIDATES[i] = new MockLeader(watcher, i);
151      CANDIDATES[i].start();
152    }
153  }
154
155  @AfterClass
156  public static void tearDownAfterClass() throws Exception {
157    TEST_UTIL.shutdownMiniZKCluster();
158  }
159
160  @Test
161  public void testLeaderSelection() throws Exception {
162    MockLeader currentLeader = getCurrentLeader();
163    // one leader should have been found
164    assertNotNull("Leader should exist", currentLeader);
165    LOG.debug("Current leader index is "+currentLeader.getIndex());
166
167    byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
168    assertNotNull("Leader znode should contain leader index", znodeData);
169    assertTrue("Leader znode should not be empty", znodeData.length > 0);
170    int storedIndex = Bytes.toInt(znodeData);
171    LOG.debug("Stored leader index in ZK is "+storedIndex);
172    assertEquals("Leader znode should match leader index",
173        currentLeader.getIndex(), storedIndex);
174
175    // force a leader transition
176    currentLeader.abdicate();
177
178    // check for new leader
179    currentLeader = getCurrentLeader();
180    // one leader should have been found
181    assertNotNull("New leader should exist after abdication", currentLeader);
182    LOG.debug("New leader index is "+currentLeader.getIndex());
183
184    znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
185    assertNotNull("Leader znode should contain leader index", znodeData);
186    assertTrue("Leader znode should not be empty", znodeData.length > 0);
187    storedIndex = Bytes.toInt(znodeData);
188    LOG.debug("Stored leader index in ZK is "+storedIndex);
189    assertEquals("Leader znode should match leader index",
190        currentLeader.getIndex(), storedIndex);
191
192    // force another transition by stopping the current
193    currentLeader.stop("Stopping for test");
194
195    // check for new leader
196    currentLeader = getCurrentLeader();
197    // one leader should have been found
198    assertNotNull("New leader should exist after stop", currentLeader);
199    LOG.debug("New leader index is "+currentLeader.getIndex());
200
201    znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE);
202    assertNotNull("Leader znode should contain leader index", znodeData);
203    assertTrue("Leader znode should not be empty", znodeData.length > 0);
204    storedIndex = Bytes.toInt(znodeData);
205    LOG.debug("Stored leader index in ZK is "+storedIndex);
206    assertEquals("Leader znode should match leader index",
207        currentLeader.getIndex(), storedIndex);
208
209    // with a second stop we can guarantee that a previous leader has resumed leading
210    currentLeader.stop("Stopping for test");
211
212    // check for new
213    currentLeader = getCurrentLeader();
214    assertNotNull("New leader should exist", currentLeader);
215  }
216
217  private MockLeader getCurrentLeader() {
218    MockLeader currentLeader = null;
219
220    // Wait up to 10 secs for initial leader
221    for (int i = 0; i < 1000; i++) {
222      for (int j = 0; j < CANDIDATES.length; j++) {
223        if (CANDIDATES[j].isMaster()) {
224          // should only be one leader
225          if (currentLeader != null) {
226            fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!");
227          }
228          currentLeader = CANDIDATES[j];
229        }
230      }
231      if (currentLeader != null) {
232        break;
233      }
234      Threads.sleep(100);
235    }
236    return currentLeader;
237  }
238
239  private static ZKWatcher newZK(Configuration conf, String name, Abortable abort)
240      throws Exception {
241    Configuration copy = HBaseConfiguration.create(conf);
242    return new ZKWatcher(copy, name, abort);
243  }
244}