1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
25 import org.apache.hadoop.hbase.util.HasThread;
26
27 import java.util.ConcurrentModificationException;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.Delayed;
32 import java.util.concurrent.TimeUnit;
33
34 import java.io.IOException;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 @InterfaceAudience.Private
56 public class Leases extends HasThread {
57 private static final Log LOG = LogFactory.getLog(Leases.class.getName());
58 public static final int MIN_WAIT_TIME = 100;
59 private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
60
61 protected final int leaseCheckFrequency;
62 protected volatile boolean stopRequested = false;
63
64
65
66
67
68
69
70 public Leases(final int leaseCheckFrequency) {
71 super("RegionServerLeases");
72 this.leaseCheckFrequency = leaseCheckFrequency;
73 setDaemon(true);
74 }
75
76
77
78
79 @Override
80 public void run() {
81 long toWait = leaseCheckFrequency;
82 Lease nextLease = null;
83 long nextLeaseDelay = Long.MAX_VALUE;
84
85 while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
86
87 try {
88 if (nextLease != null) {
89 toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
90 }
91
92 toWait = Math.min(leaseCheckFrequency, toWait);
93 toWait = Math.max(MIN_WAIT_TIME, toWait);
94
95 Thread.sleep(toWait);
96 } catch (InterruptedException e) {
97 continue;
98 } catch (ConcurrentModificationException e) {
99 continue;
100 } catch (Throwable e) {
101 LOG.fatal("Unexpected exception killed leases thread", e);
102 break;
103 }
104
105 nextLease = null;
106 nextLeaseDelay = Long.MAX_VALUE;
107 for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
108 Map.Entry<String, Lease> entry = it.next();
109 Lease lease = entry.getValue();
110 long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
111 if ( thisLeaseDelay > 0) {
112 if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
113 nextLease = lease;
114 nextLeaseDelay = thisLeaseDelay;
115 }
116 } else {
117
118
119 if (lease.getListener() == null) {
120 LOG.error("lease listener is null for lease " + lease.getLeaseName());
121 } else {
122 lease.getListener().leaseExpired();
123 }
124 it.remove();
125 }
126 }
127 }
128 close();
129 }
130
131
132
133
134
135
136
137
138 public void closeAfterLeasesExpire() {
139 this.stopRequested = true;
140 }
141
142
143
144
145
146 public void close() {
147 LOG.info(Thread.currentThread().getName() + " closing leases");
148 this.stopRequested = true;
149 leases.clear();
150 LOG.info(Thread.currentThread().getName() + " closed leases");
151 }
152
153
154
155
156
157
158
159
160
161 public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
162 throws LeaseStillHeldException {
163 addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
164 }
165
166
167
168
169
170
171 public void addLease(final Lease lease) throws LeaseStillHeldException {
172 if (this.stopRequested) {
173 return;
174 }
175 if (leases.containsKey(lease.getLeaseName())) {
176 throw new LeaseStillHeldException(lease.getLeaseName());
177 }
178 lease.resetExpirationTime();
179 leases.put(lease.getLeaseName(), lease);
180 }
181
182
183
184
185
186
187
188 public void renewLease(final String leaseName) throws LeaseException {
189 if (this.stopRequested) {
190 return;
191 }
192 Lease lease = leases.get(leaseName);
193
194 if (lease == null ) {
195 throw new LeaseException("lease '" + leaseName +
196 "' does not exist or has already expired");
197 }
198 lease.resetExpirationTime();
199 }
200
201
202
203
204
205
206 public void cancelLease(final String leaseName) throws LeaseException {
207 removeLease(leaseName);
208 }
209
210
211
212
213
214
215
216
217
218
219 Lease removeLease(final String leaseName) throws LeaseException {
220 Lease lease = leases.remove(leaseName);
221 if (lease == null) {
222 throw new LeaseException("lease '" + leaseName + "' does not exist");
223 }
224 return lease;
225 }
226
227
228
229
230
231 @SuppressWarnings("serial")
232 public static class LeaseStillHeldException extends IOException {
233 private final String leaseName;
234
235
236
237
238 public LeaseStillHeldException(final String name) {
239 this.leaseName = name;
240 }
241
242
243 public String getName() {
244 return this.leaseName;
245 }
246 }
247
248
249 static class Lease implements Delayed {
250 private final String leaseName;
251 private final LeaseListener listener;
252 private int leaseTimeoutPeriod;
253 private long expirationTime;
254
255 Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
256 this.leaseName = leaseName;
257 this.listener = listener;
258 this.leaseTimeoutPeriod = leaseTimeoutPeriod;
259 this.expirationTime = 0;
260 }
261
262
263 public String getLeaseName() {
264 return leaseName;
265 }
266
267
268 public LeaseListener getListener() {
269 return this.listener;
270 }
271
272 @Override
273 public boolean equals(Object obj) {
274 if (this == obj) {
275 return true;
276 }
277 if (obj == null) {
278 return false;
279 }
280 if (getClass() != obj.getClass()) {
281 return false;
282 }
283 return this.hashCode() == obj.hashCode();
284 }
285
286 @Override
287 public int hashCode() {
288 return this.leaseName.hashCode();
289 }
290
291 public long getDelay(TimeUnit unit) {
292 return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
293 TimeUnit.MILLISECONDS);
294 }
295
296 public int compareTo(Delayed o) {
297 long delta = this.getDelay(TimeUnit.MILLISECONDS) -
298 o.getDelay(TimeUnit.MILLISECONDS);
299
300 return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
301 }
302
303
304
305
306 public void resetExpirationTime() {
307 this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
308 }
309 }
310 }