1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.cleaner;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Abortable;
29 import org.apache.hadoop.hbase.ScheduledChore;
30 import org.apache.hadoop.hbase.Stoppable;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.replication.ReplicationFactory;
33 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
34 import org.apache.hadoop.hbase.replication.ReplicationTracker;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
37 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
38 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39 import org.apache.zookeeper.KeeperException;
40 import org.apache.zookeeper.data.Stat;
41
42
43
44
45
46 @InterfaceAudience.Private
47 public class ReplicationZKLockCleanerChore extends ScheduledChore {
48 private static final Log LOG = LogFactory.getLog(ReplicationZKLockCleanerChore.class);
49 private ZooKeeperWatcher zk;
50 private ReplicationTracker tracker;
51 private long ttl;
52 private ReplicationQueuesZKImpl queues;
53
54
55 private static final long DEFAULT_TTL = 60 * 10 * 1000;
56
57 @VisibleForTesting
58 public static final String TTL_CONFIG_KEY = "hbase.replication.zk.deadrs.lock.ttl";
59
60 public ReplicationZKLockCleanerChore(Stoppable stopper, Abortable abortable, int period,
61 ZooKeeperWatcher zk, Configuration conf) throws Exception {
62 super("ReplicationZKLockCleanerChore", stopper, period);
63
64 this.zk = zk;
65 this.ttl = conf.getLong(TTL_CONFIG_KEY, DEFAULT_TTL);
66 tracker = ReplicationFactory.getReplicationTracker(zk,
67 ReplicationFactory.getReplicationPeers(zk, conf, abortable), conf, abortable, stopper);
68 queues = new ReplicationQueuesZKImpl(zk, conf, abortable);
69 }
70
71 @Override protected void chore() {
72 try {
73 List<String> regionServers = tracker.getListOfRegionServers();
74 if (regionServers == null) {
75 return;
76 }
77 Set<String> rsSet = new HashSet<String>(regionServers);
78 List<String> replicators = queues.getListOfReplicators();
79
80 for (String replicator: replicators) {
81 try {
82 String lockNode = queues.getLockZNode(replicator);
83 byte[] data = ZKUtil.getData(zk, lockNode);
84 if (data == null) {
85 continue;
86 }
87 String rsServerNameZnode = Bytes.toString(data);
88 String[] array = rsServerNameZnode.split("/");
89 String znode = array[array.length - 1];
90 if (!rsSet.contains(znode)) {
91 Stat s = zk.getRecoverableZooKeeper().exists(lockNode, false);
92 if (s != null && EnvironmentEdgeManager.currentTime() - s.getMtime() > this.ttl) {
93
94 ZKUtil.deleteNode(zk, lockNode);
95 LOG.info("Remove lock acquired by dead RS: " + lockNode + " by " + znode);
96 }
97 continue;
98 }
99 LOG.info("Skip lock acquired by live RS: " + lockNode + " by " + znode);
100
101 } catch (KeeperException.NoNodeException ignore) {
102 } catch (InterruptedException e) {
103 LOG.warn("zk operation interrupted", e);
104 Thread.currentThread().interrupt();
105 }
106 }
107 } catch (KeeperException e) {
108 LOG.warn("zk operation interrupted", e);
109 }
110
111 }
112 }