001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.zookeeper;
019
020import java.util.concurrent.atomic.AtomicBoolean;
021import org.apache.hadoop.hbase.Stoppable;
022import org.apache.hadoop.hbase.util.Bytes;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.apache.zookeeper.KeeperException;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * Handles coordination of a single "leader" instance among many possible candidates. The first
030 * {@link ZKLeaderManager} to successfully create the given znode becomes the leader, allowing the
031 * instance to continue with whatever processing must be protected. Other {@link ZKLeaderManager}
032 * instances will wait to be notified of changes to the leader znode. If the current master instance
033 * fails, the ephemeral leader znode will be removed, and all waiting instances will be notified,
034 * with the race to claim the leader znode beginning all over again.
035 * @deprecated Not used
036 */
037@Deprecated
038@InterfaceAudience.Private
039public class ZKLeaderManager extends ZKListener {
040  private static final Logger LOG = LoggerFactory.getLogger(ZKLeaderManager.class);
041
042  private final Object lock = new Object();
043  private final AtomicBoolean leaderExists = new AtomicBoolean();
044  private final String leaderZNode;
045  private final byte[] nodeId;
046  private final Stoppable candidate;
047
048  public ZKLeaderManager(ZKWatcher watcher, String leaderZNode, byte[] identifier,
049    Stoppable candidate) {
050    super(watcher);
051    this.leaderZNode = leaderZNode;
052    this.nodeId = identifier;
053    this.candidate = candidate;
054  }
055
056  public void start() {
057    try {
058      watcher.registerListener(this);
059      String parent = ZKUtil.getParent(leaderZNode);
060      if (ZKUtil.checkExists(watcher, parent) < 0) {
061        ZKUtil.createWithParents(watcher, parent);
062      }
063    } catch (KeeperException ke) {
064      watcher.abort("Unhandled zk exception when starting", ke);
065      candidate.stop("Unhandled zk exception starting up: " + ke.getMessage());
066    }
067  }
068
069  @Override
070  public void nodeCreated(String path) {
071    if (leaderZNode.equals(path) && !candidate.isStopped()) {
072      handleLeaderChange();
073    }
074  }
075
076  @Override
077  public void nodeDeleted(String path) {
078    if (leaderZNode.equals(path) && !candidate.isStopped()) {
079      handleLeaderChange();
080    }
081  }
082
083  private void handleLeaderChange() {
084    try {
085      synchronized (lock) {
086        if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
087          LOG.info("Found new leader for znode: {}", leaderZNode);
088          leaderExists.set(true);
089        } else {
090          LOG.info("Leader change, but no new leader found");
091          leaderExists.set(false);
092          lock.notifyAll();
093        }
094      }
095    } catch (KeeperException ke) {
096      watcher.abort("ZooKeeper error checking for leader znode", ke);
097      candidate.stop("ZooKeeper error checking for leader: " + ke.getMessage());
098    }
099  }
100
101  /**
102   * Blocks until this instance has claimed the leader ZNode in ZooKeeper
103   */
104  public void waitToBecomeLeader() {
105    while (!candidate.isStopped()) {
106      try {
107        if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
108          // claimed the leader znode
109          leaderExists.set(true);
110          if (LOG.isDebugEnabled()) {
111            LOG.debug("Claimed the leader znode as '" + Bytes.toStringBinary(nodeId) + "'");
112          }
113          return;
114        }
115
116        // if claiming the node failed, there should be another existing node
117        byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
118        if (currentId != null && Bytes.equals(currentId, nodeId)) {
119          // claimed with our ID, but we didn't grab it, possibly restarted?
120          LOG.info(
121            "Found existing leader with our ID (" + Bytes.toStringBinary(nodeId) + "), removing");
122          ZKUtil.deleteNode(watcher, leaderZNode);
123          leaderExists.set(false);
124        } else {
125          LOG.info("Found existing leader with ID: {}", Bytes.toStringBinary(currentId));
126          leaderExists.set(true);
127        }
128      } catch (KeeperException ke) {
129        watcher.abort("Unexpected error from ZK, stopping candidate", ke);
130        candidate.stop("Unexpected error from ZK: " + ke.getMessage());
131        return;
132      }
133
134      // wait for next chance
135      synchronized (lock) {
136        while (leaderExists.get() && !candidate.isStopped()) {
137          try {
138            lock.wait();
139          } catch (InterruptedException ie) {
140            LOG.debug("Interrupted waiting on leader", ie);
141          }
142        }
143      }
144    }
145  }
146
147  /**
148   * Removes the leader znode, if it is currently claimed by this instance.
149   */
150  public void stepDownAsLeader() {
151    try {
152      synchronized (lock) {
153        if (!leaderExists.get()) {
154          return;
155        }
156        byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
157        if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
158          LOG.info("Stepping down as leader");
159          ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
160          leaderExists.set(false);
161        } else {
162          LOG.info("Not current leader, no need to step down");
163        }
164      }
165    } catch (KeeperException ke) {
166      watcher.abort("Unhandled zookeeper exception removing leader node", ke);
167      candidate.stop("Unhandled zookeeper exception removing leader node: " + ke.getMessage());
168    } catch (InterruptedException e) {
169      watcher.abort("Unhandled zookeeper exception removing leader node", e);
170      candidate.stop("Unhandled zookeeper exception removing leader node: " + e.getMessage());
171    }
172  }
173
174  public boolean hasLeader() {
175    return leaderExists.get();
176  }
177}