1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
25 import org.apache.hadoop.hbase.util.HasThread;
26
27 import java.util.ConcurrentModificationException;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Delayed;
32 import java.util.concurrent.TimeUnit;
33
34 import java.io.IOException;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 @InterfaceAudience.Private
56 public class Leases extends HasThread {
57 private static final Log LOG = LogFactory.getLog(Leases.class.getName());
58 public static final int MIN_WAIT_TIME = 100;
59 private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
60
61 protected final int leaseCheckFrequency;
62 protected volatile boolean stopRequested = false;
63
64
65
66
67
68
69
70 public Leases(final int leaseCheckFrequency) {
71 this.leaseCheckFrequency = leaseCheckFrequency;
72 setDaemon(true);
73 }
74
75
76
77
78 @Override
79 public void run() {
80 long toWait = leaseCheckFrequency;
81 Lease nextLease = null;
82 long nextLeaseDelay = Long.MAX_VALUE;
83
84 while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
85
86 try {
87 if (nextLease != null) {
88 toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
89 }
90
91 toWait = Math.min(leaseCheckFrequency, toWait);
92 toWait = Math.max(MIN_WAIT_TIME, toWait);
93
94 Thread.sleep(toWait);
95 } catch (InterruptedException e) {
96 continue;
97 } catch (ConcurrentModificationException e) {
98 continue;
99 } catch (Throwable e) {
100 LOG.fatal("Unexpected exception killed leases thread", e);
101 break;
102 }
103
104 nextLease = null;
105 nextLeaseDelay = Long.MAX_VALUE;
106 for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
107 Map.Entry<String, Lease> entry = it.next();
108 Lease lease = entry.getValue();
109 long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
110 if ( thisLeaseDelay > 0) {
111 if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
112 nextLease = lease;
113 nextLeaseDelay = thisLeaseDelay;
114 }
115 } else {
116
117
118 if (lease.getListener() == null) {
119 LOG.error("lease listener is null for lease " + lease.getLeaseName());
120 } else {
121 lease.getListener().leaseExpired();
122 }
123 it.remove();
124 }
125 }
126 }
127 close();
128 }
129
130
131
132
133
134
135
136
137 public void closeAfterLeasesExpire() {
138 this.stopRequested = true;
139 }
140
141
142
143
144
145 public void close() {
146 LOG.info(Thread.currentThread().getName() + " closing leases");
147 this.stopRequested = true;
148 leases.clear();
149 LOG.info(Thread.currentThread().getName() + " closed leases");
150 }
151
152
153
154
155
156
157
158
159
160 public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
161 throws LeaseStillHeldException {
162 addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
163 }
164
165
166
167
168
169
170 public void addLease(final Lease lease) throws LeaseStillHeldException {
171 if (this.stopRequested) {
172 return;
173 }
174 if (leases.containsKey(lease.getLeaseName())) {
175 throw new LeaseStillHeldException(lease.getLeaseName());
176 }
177 lease.resetExpirationTime();
178 leases.put(lease.getLeaseName(), lease);
179 }
180
181
182
183
184
185
186
187 public void renewLease(final String leaseName) throws LeaseException {
188 if (this.stopRequested) {
189 return;
190 }
191 Lease lease = leases.get(leaseName);
192
193 if (lease == null ) {
194 throw new LeaseException("lease '" + leaseName +
195 "' does not exist or has already expired");
196 }
197 lease.resetExpirationTime();
198 }
199
200
201
202
203
204
205 public void cancelLease(final String leaseName) throws LeaseException {
206 removeLease(leaseName);
207 }
208
209
210
211
212
213
214
215
216
217
218 Lease removeLease(final String leaseName) throws LeaseException {
219 Lease lease = leases.remove(leaseName);
220 if (lease == null) {
221 throw new LeaseException("lease '" + leaseName + "' does not exist");
222 }
223 return lease;
224 }
225
226
227
228
229
230 @SuppressWarnings("serial")
231 public static class LeaseStillHeldException extends IOException {
232 private final String leaseName;
233
234
235
236
237 public LeaseStillHeldException(final String name) {
238 this.leaseName = name;
239 }
240
241
242 public String getName() {
243 return this.leaseName;
244 }
245 }
246
247
248 static class Lease implements Delayed {
249 private final String leaseName;
250 private final LeaseListener listener;
251 private int leaseTimeoutPeriod;
252 private long expirationTime;
253
254 Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
255 this.leaseName = leaseName;
256 this.listener = listener;
257 this.leaseTimeoutPeriod = leaseTimeoutPeriod;
258 this.expirationTime = 0;
259 }
260
261
262 public String getLeaseName() {
263 return leaseName;
264 }
265
266
267 public LeaseListener getListener() {
268 return this.listener;
269 }
270
271 @Override
272 public boolean equals(Object obj) {
273 if (this == obj) {
274 return true;
275 }
276 if (obj == null) {
277 return false;
278 }
279 if (getClass() != obj.getClass()) {
280 return false;
281 }
282 return this.hashCode() == obj.hashCode();
283 }
284
285 @Override
286 public int hashCode() {
287 return this.leaseName.hashCode();
288 }
289
290 public long getDelay(TimeUnit unit) {
291 return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
292 TimeUnit.MILLISECONDS);
293 }
294
295 public int compareTo(Delayed o) {
296 long delta = this.getDelay(TimeUnit.MILLISECONDS) -
297 o.getDelay(TimeUnit.MILLISECONDS);
298
299 return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
300 }
301
302
303
304
305 public void resetExpirationTime() {
306 this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
307 }
308 }
309 }