001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.zookeeper;
020
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import org.apache.hadoop.hbase.Stoppable;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.zookeeper.KeeperException;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * Handles coordination of a single "leader" instance among many possible
032 * candidates.  The first {@link ZKLeaderManager} to successfully create
033 * the given znode becomes the leader, allowing the instance to continue
034 * with whatever processing must be protected.  Other {@link ZKLeaderManager}
035 * instances will wait to be notified of changes to the leader znode.
036 * If the current master instance fails, the ephemeral leader znode will
037 * be removed, and all waiting instances will be notified, with the race
038 * to claim the leader znode beginning all over again.
039 * @deprecated Not used
040 */
041@Deprecated
042@InterfaceAudience.Private
043public class ZKLeaderManager extends ZKListener {
044  private static final Logger LOG = LoggerFactory.getLogger(ZKLeaderManager.class);
045
046  private final Object lock = new Object();
047  private final AtomicBoolean leaderExists = new AtomicBoolean();
048  private final String leaderZNode;
049  private final byte[] nodeId;
050  private final Stoppable candidate;
051
052  public ZKLeaderManager(ZKWatcher watcher, String leaderZNode,
053                         byte[] identifier, Stoppable candidate) {
054    super(watcher);
055    this.leaderZNode = leaderZNode;
056    this.nodeId = identifier;
057    this.candidate = candidate;
058  }
059
060  public void start() {
061    try {
062      watcher.registerListener(this);
063      String parent = ZKUtil.getParent(leaderZNode);
064      if (ZKUtil.checkExists(watcher, parent) < 0) {
065        ZKUtil.createWithParents(watcher, parent);
066      }
067    } catch (KeeperException ke) {
068      watcher.abort("Unhandled zk exception when starting", ke);
069      candidate.stop("Unhandled zk exception starting up: "+ke.getMessage());
070    }
071  }
072
073  @Override
074  public void nodeCreated(String path) {
075    if (leaderZNode.equals(path) && !candidate.isStopped()) {
076      handleLeaderChange();
077    }
078  }
079
080  @Override
081  public void nodeDeleted(String path) {
082    if (leaderZNode.equals(path) && !candidate.isStopped()) {
083      handleLeaderChange();
084    }
085  }
086
087  private void handleLeaderChange() {
088    try {
089      synchronized(lock) {
090        if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) {
091          LOG.info("Found new leader for znode: "+leaderZNode);
092          leaderExists.set(true);
093        } else {
094          LOG.info("Leader change, but no new leader found");
095          leaderExists.set(false);
096          lock.notifyAll();
097        }
098      }
099    } catch (KeeperException ke) {
100      watcher.abort("ZooKeeper error checking for leader znode", ke);
101      candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage());
102    }
103  }
104
105  /**
106   * Blocks until this instance has claimed the leader ZNode in ZooKeeper
107   */
108  public void waitToBecomeLeader() {
109    while (!candidate.isStopped()) {
110      try {
111        if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) {
112          // claimed the leader znode
113          leaderExists.set(true);
114          if (LOG.isDebugEnabled()) {
115            LOG.debug("Claimed the leader znode as '"+
116                Bytes.toStringBinary(nodeId)+"'");
117          }
118          return;
119        }
120
121        // if claiming the node failed, there should be another existing node
122        byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode);
123        if (currentId != null && Bytes.equals(currentId, nodeId)) {
124          // claimed with our ID, but we didn't grab it, possibly restarted?
125          LOG.info("Found existing leader with our ID ("+
126              Bytes.toStringBinary(nodeId)+"), removing");
127          ZKUtil.deleteNode(watcher, leaderZNode);
128          leaderExists.set(false);
129        } else {
130          LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId));
131          leaderExists.set(true);
132        }
133      } catch (KeeperException ke) {
134        watcher.abort("Unexpected error from ZK, stopping candidate", ke);
135        candidate.stop("Unexpected error from ZK: "+ke.getMessage());
136        return;
137      }
138
139      // wait for next chance
140      synchronized(lock) {
141        while (leaderExists.get() && !candidate.isStopped()) {
142          try {
143            lock.wait();
144          } catch (InterruptedException ie) {
145            LOG.debug("Interrupted waiting on leader", ie);
146          }
147        }
148      }
149    }
150  }
151
152  /**
153   * Removes the leader znode, if it is currently claimed by this instance.
154   */
155  public void stepDownAsLeader() {
156    try {
157      synchronized(lock) {
158        if (!leaderExists.get()) {
159          return;
160        }
161        byte[] leaderId = ZKUtil.getData(watcher, leaderZNode);
162        if (leaderId != null && Bytes.equals(nodeId, leaderId)) {
163          LOG.info("Stepping down as leader");
164          ZKUtil.deleteNodeFailSilent(watcher, leaderZNode);
165          leaderExists.set(false);
166        } else {
167          LOG.info("Not current leader, no need to step down");
168        }
169      }
170    } catch (KeeperException ke) {
171      watcher.abort("Unhandled zookeeper exception removing leader node", ke);
172      candidate.stop("Unhandled zookeeper exception removing leader node: "
173          + ke.getMessage());
174    } catch (InterruptedException e) {
175      watcher.abort("Unhandled zookeeper exception removing leader node", e);
176      candidate.stop("Unhandled zookeeper exception removing leader node: "
177          + e.getMessage());
178    }
179  }
180
181  public boolean hasLeader() {
182    return leaderExists.get();
183  }
184}