001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.zookeeper; 019 020import java.util.concurrent.atomic.AtomicBoolean; 021import org.apache.hadoop.hbase.Stoppable; 022import org.apache.hadoop.hbase.util.Bytes; 023import org.apache.yetus.audience.InterfaceAudience; 024import org.apache.zookeeper.KeeperException; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * Handles coordination of a single "leader" instance among many possible candidates. The first 030 * {@link ZKLeaderManager} to successfully create the given znode becomes the leader, allowing the 031 * instance to continue with whatever processing must be protected. Other {@link ZKLeaderManager} 032 * instances will wait to be notified of changes to the leader znode. If the current master instance 033 * fails, the ephemeral leader znode will be removed, and all waiting instances will be notified, 034 * with the race to claim the leader znode beginning all over again. 035 * @deprecated Not used 036 */ 037@Deprecated 038@InterfaceAudience.Private 039public class ZKLeaderManager extends ZKListener { 040 private static final Logger LOG = LoggerFactory.getLogger(ZKLeaderManager.class); 041 042 private final Object lock = new Object(); 043 private final AtomicBoolean leaderExists = new AtomicBoolean(); 044 private final String leaderZNode; 045 private final byte[] nodeId; 046 private final Stoppable candidate; 047 048 public ZKLeaderManager(ZKWatcher watcher, String leaderZNode, byte[] identifier, 049 Stoppable candidate) { 050 super(watcher); 051 this.leaderZNode = leaderZNode; 052 this.nodeId = identifier; 053 this.candidate = candidate; 054 } 055 056 public void start() { 057 try { 058 watcher.registerListener(this); 059 String parent = ZKUtil.getParent(leaderZNode); 060 if (ZKUtil.checkExists(watcher, parent) < 0) { 061 ZKUtil.createWithParents(watcher, parent); 062 } 063 } catch (KeeperException ke) { 064 watcher.abort("Unhandled zk exception when starting", ke); 065 candidate.stop("Unhandled zk exception starting up: " + ke.getMessage()); 066 } 067 } 068 069 @Override 070 public void nodeCreated(String path) { 071 if (leaderZNode.equals(path) && !candidate.isStopped()) { 072 handleLeaderChange(); 073 } 074 } 075 076 @Override 077 public void nodeDeleted(String path) { 078 if (leaderZNode.equals(path) && !candidate.isStopped()) { 079 handleLeaderChange(); 080 } 081 } 082 083 private void handleLeaderChange() { 084 try { 085 synchronized (lock) { 086 if (ZKUtil.watchAndCheckExists(watcher, leaderZNode)) { 087 LOG.info("Found new leader for znode: {}", leaderZNode); 088 leaderExists.set(true); 089 } else { 090 LOG.info("Leader change, but no new leader found"); 091 leaderExists.set(false); 092 lock.notifyAll(); 093 } 094 } 095 } catch (KeeperException ke) { 096 watcher.abort("ZooKeeper error checking for leader znode", ke); 097 candidate.stop("ZooKeeper error checking for leader: " + ke.getMessage()); 098 } 099 } 100 101 /** 102 * Blocks until this instance has claimed the leader ZNode in ZooKeeper 103 */ 104 public void waitToBecomeLeader() { 105 while (!candidate.isStopped()) { 106 try { 107 if (ZKUtil.createEphemeralNodeAndWatch(watcher, leaderZNode, nodeId)) { 108 // claimed the leader znode 109 leaderExists.set(true); 110 if (LOG.isDebugEnabled()) { 111 LOG.debug("Claimed the leader znode as '" + Bytes.toStringBinary(nodeId) + "'"); 112 } 113 return; 114 } 115 116 // if claiming the node failed, there should be another existing node 117 byte[] currentId = ZKUtil.getDataAndWatch(watcher, leaderZNode); 118 if (currentId != null && Bytes.equals(currentId, nodeId)) { 119 // claimed with our ID, but we didn't grab it, possibly restarted? 120 LOG.info( 121 "Found existing leader with our ID (" + Bytes.toStringBinary(nodeId) + "), removing"); 122 ZKUtil.deleteNode(watcher, leaderZNode); 123 leaderExists.set(false); 124 } else { 125 LOG.info("Found existing leader with ID: {}", Bytes.toStringBinary(currentId)); 126 leaderExists.set(true); 127 } 128 } catch (KeeperException ke) { 129 watcher.abort("Unexpected error from ZK, stopping candidate", ke); 130 candidate.stop("Unexpected error from ZK: " + ke.getMessage()); 131 return; 132 } 133 134 // wait for next chance 135 synchronized (lock) { 136 while (leaderExists.get() && !candidate.isStopped()) { 137 try { 138 lock.wait(); 139 } catch (InterruptedException ie) { 140 LOG.debug("Interrupted waiting on leader", ie); 141 } 142 } 143 } 144 } 145 } 146 147 /** 148 * Removes the leader znode, if it is currently claimed by this instance. 149 */ 150 public void stepDownAsLeader() { 151 try { 152 synchronized (lock) { 153 if (!leaderExists.get()) { 154 return; 155 } 156 byte[] leaderId = ZKUtil.getData(watcher, leaderZNode); 157 if (leaderId != null && Bytes.equals(nodeId, leaderId)) { 158 LOG.info("Stepping down as leader"); 159 ZKUtil.deleteNodeFailSilent(watcher, leaderZNode); 160 leaderExists.set(false); 161 } else { 162 LOG.info("Not current leader, no need to step down"); 163 } 164 } 165 } catch (KeeperException ke) { 166 watcher.abort("Unhandled zookeeper exception removing leader node", ke); 167 candidate.stop("Unhandled zookeeper exception removing leader node: " + ke.getMessage()); 168 } catch (InterruptedException e) { 169 watcher.abort("Unhandled zookeeper exception removing leader node", e); 170 candidate.stop("Unhandled zookeeper exception removing leader node: " + e.getMessage()); 171 } 172 } 173 174 public boolean hasLeader() { 175 return leaderExists.get(); 176 } 177}