1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.cleaner;
20
21 import com.google.common.annotations.VisibleForTesting;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Set;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Abortable;
29 import org.apache.hadoop.hbase.ScheduledChore;
30 import org.apache.hadoop.hbase.Stoppable;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.replication.ReplicationException;
33 import org.apache.hadoop.hbase.replication.ReplicationFactory;
34 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
35 import org.apache.hadoop.hbase.replication.ReplicationTracker;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
38 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
39 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
40 import org.apache.zookeeper.KeeperException;
41 import org.apache.zookeeper.data.Stat;
42
43
44
45
46
47 @InterfaceAudience.Private
48 public class ReplicationZKLockCleanerChore extends ScheduledChore {
49 private static final Log LOG = LogFactory.getLog(ReplicationZKLockCleanerChore.class);
50 private ZooKeeperWatcher zk;
51 private ReplicationTracker tracker;
52 private long ttl;
53 private ReplicationQueuesZKImpl queues;
54
55
56 private static final long DEFAULT_TTL = 60 * 10 * 1000;
57
58 @VisibleForTesting
59 public static final String TTL_CONFIG_KEY = "hbase.replication.zk.deadrs.lock.ttl";
60
61 public ReplicationZKLockCleanerChore(Stoppable stopper, Abortable abortable, int period,
62 ZooKeeperWatcher zk, Configuration conf) throws Exception {
63 super("ReplicationZKLockCleanerChore", stopper, period);
64
65 this.zk = zk;
66 this.ttl = conf.getLong(TTL_CONFIG_KEY, DEFAULT_TTL);
67 tracker = ReplicationFactory.getReplicationTracker(zk,
68 ReplicationFactory.getReplicationPeers(zk, conf, abortable), conf, abortable, stopper);
69 queues = new ReplicationQueuesZKImpl(zk, conf, abortable);
70 }
71
72 @Override protected void chore() {
73 try {
74 List<String> regionServers = tracker.getListOfRegionServers();
75 if (regionServers == null) {
76 return;
77 }
78 Set<String> rsSet = new HashSet<String>(regionServers);
79 List<String> replicators = queues.getListOfReplicators();
80 if (replicators == null || replicators.isEmpty()) {
81 return;
82 }
83 for (String replicator: replicators) {
84 try {
85 String lockNode = queues.getLockZNode(replicator);
86 byte[] data = ZKUtil.getData(zk, lockNode);
87 if (data == null) {
88 continue;
89 }
90 String rsServerNameZnode = Bytes.toString(data);
91 String[] array = rsServerNameZnode.split("/");
92 String znode = array[array.length - 1];
93 if (!rsSet.contains(znode)) {
94 Stat s = zk.getRecoverableZooKeeper().exists(lockNode, false);
95 if (s != null && EnvironmentEdgeManager.currentTime() - s.getMtime() > this.ttl) {
96
97 ZKUtil.deleteNode(zk, lockNode);
98 LOG.info("Remove lock acquired by dead RS: " + lockNode + " by " + znode);
99 }
100 continue;
101 }
102 LOG.info("Skip lock acquired by live RS: " + lockNode + " by " + znode);
103
104 } catch (KeeperException.NoNodeException ignore) {
105 } catch (InterruptedException e) {
106 LOG.warn("zk operation interrupted", e);
107 Thread.currentThread().interrupt();
108 }
109 }
110 } catch (KeeperException e) {
111 LOG.warn("zk operation interrupted", e);
112 } catch (ReplicationException e2) {
113 LOG.warn("replication exception", e2);
114 }
115 }
116 }