001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.zookeeper; 020 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import org.apache.hadoop.hbase.Stoppable; 024import org.apache.hadoop.hbase.util.Bytes; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.zookeeper.KeeperException; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * Handles coordination of a single "leader" instance among many possible 032 * candidates. The first {@link ZKLeaderManager} to successfully create 033 * the given znode becomes the leader, allowing the instance to continue 034 * with whatever processing must be protected. Other {@link ZKLeaderManager} 035 * instances will wait to be notified of changes to the leader znode. 036 * If the current master instance fails, the ephemeral leader znode will 037 * be removed, and all waiting instances will be notified, with the race 038 * to claim the leader znode beginning all over again. 039 * @deprecated Not used 040 */ 041@Deprecated 042@InterfaceAudience.Private 043public class ZKLeaderManager extends ZKListener { 044 private static final Logger LOG = LoggerFactory.getLogger(ZKLeaderManager.class); 045 046 private final Object lock = new Object(); 047 private final AtomicBoolean leaderExists = new AtomicBoolean(); 048 private final String leaderZNode; 049 private final byte[] nodeId; 050 private final Stoppable candidate; 051 052 public ZKLeaderManager(ZKWatcher watcher, String leaderZNode, 053 byte[] identifier, Stoppable candidate) { 054 super(watcher); 055 this.leaderZNode = leaderZNode; 056 this.nodeId = identifier; 057 this.candidate = candidate; 058 } 059 060 public void start() { 061 try { 062 watcher.registerListener(this); 063 String parent = ZKUtil.getParent(leaderZNode); 064 if (ZKUtil.checkExists(watcher, parent) < 0) { 065 ZKUtil.createWithParents(watcher, parent); 066 } 067 } catch (KeeperException ke) { 068 watcher.abort("Unhandled zk exception when starting", ke); 069 candidate.stop("Unhandled zk exception starting up: "+ke.getMessage()); 070 } 071 } 072 073 @Override 074 public void nodeCreated(String path) { 075 if (leaderZNode.equals(path) && !candidate.isStopped()) { 076 handleLeaderChange(); 077 } 078 } 079 080 @Override 081 public void nodeDeleted(String path) { 082 if (leaderZNode.equals(path) && !candidate.isStopped()) { 083 handleLeaderChange(); 084 } 085 } 086 087 private void handleLeaderChange() { 088 try { 089 synchronized(lock) { 090 if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) { 091 LOG.info("Found new leader for znode: "+leaderZNode); 092 leaderExists.set(true); 093 } else { 094 LOG.info("Leader change, but no new leader found"); 095 leaderExists.set(false); 096 lock.notifyAll(); 097 } 098 } 099 } catch (KeeperException ke) { 100 watcher.abort("ZooKeeper error checking for leader znode", ke); 101 candidate.stop("ZooKeeper error checking for leader: "+ke.getMessage()); 102 } 103 } 104 105 /** 106 * Blocks until this instance has claimed the leader ZNode in ZooKeeper 107 */ 108 public void waitToBecomeLeader() { 109 while (!candidate.isStopped()) { 110 try { 111 if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) { 112 // claimed the leader znode 113 leaderExists.set(true); 114 if (LOG.isDebugEnabled()) { 115 LOG.debug("Claimed the leader znode as '"+ 116 Bytes.toStringBinary(nodeId)+"'"); 117 } 118 return; 119 } 120 121 // if claiming the node failed, there should be another existing node 122 byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode); 123 if (currentId != null && Bytes.equals(currentId, nodeId)) { 124 // claimed with our ID, but we didn't grab it, possibly restarted? 125 LOG.info("Found existing leader with our ID ("+ 126 Bytes.toStringBinary(nodeId)+"), removing"); 127 ZKUtil.deleteNode(watcher, leaderZNode); 128 leaderExists.set(false); 129 } else { 130 LOG.info("Found existing leader with ID: "+Bytes.toStringBinary(nodeId)); 131 leaderExists.set(true); 132 } 133 } catch (KeeperException ke) { 134 watcher.abort("Unexpected error from ZK, stopping candidate", ke); 135 candidate.stop("Unexpected error from ZK: "+ke.getMessage()); 136 return; 137 } 138 139 // wait for next chance 140 synchronized(lock) { 141 while (leaderExists.get() && !candidate.isStopped()) { 142 try { 143 lock.wait(); 144 } catch (InterruptedException ie) { 145 LOG.debug("Interrupted waiting on leader", ie); 146 } 147 } 148 } 149 } 150 } 151 152 /** 153 * Removes the leader znode, if it is currently claimed by this instance. 154 */ 155 public void stepDownAsLeader() { 156 try { 157 synchronized(lock) { 158 if (!leaderExists.get()) { 159 return; 160 } 161 byte[] leaderId = ZKUtil.getData(watcher, leaderZNode); 162 if (leaderId != null && Bytes.equals(nodeId, leaderId)) { 163 LOG.info("Stepping down as leader"); 164 ZKUtil.deleteNodeFailSilent(watcher, leaderZNode); 165 leaderExists.set(false); 166 } else { 167 LOG.info("Not current leader, no need to step down"); 168 } 169 } 170 } catch (KeeperException ke) { 171 watcher.abort("Unhandled zookeeper exception removing leader node", ke); 172 candidate.stop("Unhandled zookeeper exception removing leader node: " 173 + ke.getMessage()); 174 } catch (InterruptedException e) { 175 watcher.abort("Unhandled zookeeper exception removing leader node", e); 176 candidate.stop("Unhandled zookeeper exception removing leader node: " 177 + e.getMessage()); 178 } 179 } 180 181 public boolean hasLeader() { 182 return leaderExists.get(); 183 } 184}