View Javadoc

1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.hbase.util.HasThread;
25  
26  import java.util.ConcurrentModificationException;
27  import java.util.HashMap;
28  import java.util.Map;
29  import java.util.concurrent.Delayed;
30  import java.util.concurrent.DelayQueue;
31  import java.util.concurrent.TimeUnit;
32  
33  import java.io.IOException;
34  
35  /**
36   * Leases
37   *
38   * There are several server classes in HBase that need to track external
39   * clients that occasionally send heartbeats.
40   *
41   * <p>These external clients hold resources in the server class.
42   * Those resources need to be released if the external client fails to send a
43   * heartbeat after some interval of time passes.
44   *
45   * <p>The Leases class is a general reusable class for this kind of pattern.
46   * An instance of the Leases class will create a thread to do its dirty work.
47   * You should close() the instance if you want to clean up the thread properly.
48   *
49   * <p>
50   * NOTE: This class extends Thread rather than Chore because the sleep time
51   * can be interrupted when there is something to do, rather than the Chore
52   * sleep time which is invariant.
53   */
54  public class Leases extends HasThread {
55    private static final Log LOG = LogFactory.getLog(Leases.class.getName());
56    private final int leasePeriod;
57    private final int leaseCheckFrequency;
58    private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
59    protected final Map<String, Lease> leases = new HashMap<String, Lease>();
60    private volatile boolean stopRequested = false;
61  
62    /**
63     * Creates a lease monitor
64     *
65     * @param leasePeriod - length of time (milliseconds) that the lease is valid
66     * @param leaseCheckFrequency - how often the lease should be checked
67     * (milliseconds)
68     */
69    public Leases(final int leasePeriod, final int leaseCheckFrequency) {
70      this.leasePeriod = leasePeriod;
71      this.leaseCheckFrequency = leaseCheckFrequency;
72      setDaemon(true);
73    }
74  
75    /**
76     * @see java.lang.Thread#run()
77     */
78    @Override
79    public void run() {
80      while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) {
81        Lease lease = null;
82        try {
83          lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS);
84        } catch (InterruptedException e) {
85          continue;
86        } catch (ConcurrentModificationException e) {
87          continue;
88        } catch (Throwable e) {
89          LOG.fatal("Unexpected exception killed leases thread", e);
90          break;
91        }
92        if (lease == null) {
93          continue;
94        }
95        // A lease expired.  Run the expired code before removing from queue
96        // since its presence in queue is used to see if lease exists still.
97        if (lease.getListener() == null) {
98          LOG.error("lease listener is null for lease " + lease.getLeaseName());
99        } else {
100         lease.getListener().leaseExpired();
101       }
102       synchronized (leaseQueue) {
103         leases.remove(lease.getLeaseName());
104       }
105     }
106     close();
107   }
108 
109   /**
110    * Shuts down this lease instance when all outstanding leases expire.
111    * Like {@link #close()} but rather than violently end all leases, waits
112    * first on extant leases to finish.  Use this method if the lease holders
113    * could loose data, leak locks, etc.  Presumes client has shutdown
114    * allocation of new leases.
115    */
116   public void closeAfterLeasesExpire() {
117     this.stopRequested = true;
118   }
119 
120   /**
121    * Shut down this Leases instance.  All pending leases will be destroyed,
122    * without any cancellation calls.
123    */
124   public void close() {
125     LOG.info(Thread.currentThread().getName() + " closing leases");
126     this.stopRequested = true;
127     synchronized (leaseQueue) {
128       leaseQueue.clear();
129       leases.clear();
130       leaseQueue.notifyAll();
131     }
132     LOG.info(Thread.currentThread().getName() + " closed leases");
133   }
134 
135   /**
136    * Obtain a lease
137    *
138    * @param leaseName name of the lease
139    * @param listener listener that will process lease expirations
140    * @throws LeaseStillHeldException
141    */
142   public void createLease(String leaseName, final LeaseListener listener)
143   throws LeaseStillHeldException {
144     addLease(new Lease(leaseName, listener));
145   }
146 
147   /**
148    * Inserts lease.  Resets expiration before insertion.
149    * @param lease
150    * @throws LeaseStillHeldException
151    */
152   public void addLease(final Lease lease) throws LeaseStillHeldException {
153     if (this.stopRequested) {
154       return;
155     }
156     lease.setExpirationTime(System.currentTimeMillis() + this.leasePeriod);
157     synchronized (leaseQueue) {
158       if (leases.containsKey(lease.getLeaseName())) {
159         throw new LeaseStillHeldException(lease.getLeaseName());
160       }
161       leases.put(lease.getLeaseName(), lease);
162       leaseQueue.add(lease);
163     }
164   }
165 
166   /**
167    * Thrown if we are asked create a lease but lease on passed name already
168    * exists.
169    */
170   @SuppressWarnings("serial")
171   public static class LeaseStillHeldException extends IOException {
172     private final String leaseName;
173 
174     /**
175      * @param name
176      */
177     public LeaseStillHeldException(final String name) {
178       this.leaseName = name;
179     }
180 
181     /** @return name of lease */
182     public String getName() {
183       return this.leaseName;
184     }
185   }
186 
187   /**
188    * Renew a lease
189    *
190    * @param leaseName name of lease
191    * @throws LeaseException
192    */
193   public void renewLease(final String leaseName) throws LeaseException {
194     synchronized (leaseQueue) {
195       Lease lease = leases.get(leaseName);
196       // We need to check to see if the remove is successful as the poll in the run()
197       // method could have completed between the get and the remove which will result
198       // in a corrupt leaseQueue.
199       if (lease == null || !leaseQueue.remove(lease)) {
200         throw new LeaseException("lease '" + leaseName +
201         "' does not exist or has already expired");
202       }
203       lease.setExpirationTime(System.currentTimeMillis() + leasePeriod);
204       leaseQueue.add(lease);
205     }
206   }
207 
208   /**
209    * Client explicitly cancels a lease.
210    * @param leaseName name of lease
211    * @throws LeaseException
212    */
213   public void cancelLease(final String leaseName) throws LeaseException {
214     removeLease(leaseName);
215   }
216 
217   /**
218    * Remove named lease.
219    * Lease is removed from the list of leases and removed from the delay queue.
220    * Lease can be resinserted using {@link #addLease(Lease)}
221    *
222    * @param leaseName name of lease
223    * @throws LeaseException
224    * @return Removed lease
225    */
226   Lease removeLease(final String leaseName) throws LeaseException {
227     Lease lease =  null;
228     synchronized (leaseQueue) {
229       lease = leases.remove(leaseName);
230       if (lease == null) {
231         throw new LeaseException("lease '" + leaseName + "' does not exist");
232       }
233       leaseQueue.remove(lease);
234     }
235     return lease;
236   }
237 
238   /** This class tracks a single Lease. */
239   static class Lease implements Delayed {
240     private final String leaseName;
241     private final LeaseListener listener;
242     private long expirationTime;
243 
244     Lease(final String leaseName, LeaseListener listener) {
245       this(leaseName, listener, 0);
246     }
247 
248     Lease(final String leaseName, LeaseListener listener, long expirationTime) {
249       this.leaseName = leaseName;
250       this.listener = listener;
251       this.expirationTime = expirationTime;
252     }
253 
254     /** @return the lease name */
255     public String getLeaseName() {
256       return leaseName;
257     }
258 
259     /** @return listener */
260     public LeaseListener getListener() {
261       return this.listener;
262     }
263 
264     @Override
265     public boolean equals(Object obj) {
266       if (this == obj) {
267         return true;
268       }
269       if (obj == null) {
270         return false;
271       }
272       if (getClass() != obj.getClass()) {
273         return false;
274       }
275       return this.hashCode() == ((Lease) obj).hashCode();
276     }
277 
278     @Override
279     public int hashCode() {
280       return this.leaseName.hashCode();
281     }
282 
283     public long getDelay(TimeUnit unit) {
284       return unit.convert(this.expirationTime - System.currentTimeMillis(),
285           TimeUnit.MILLISECONDS);
286     }
287 
288     public int compareTo(Delayed o) {
289       long delta = this.getDelay(TimeUnit.MILLISECONDS) -
290         o.getDelay(TimeUnit.MILLISECONDS);
291 
292       return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
293     }
294 
295     /** @param expirationTime the expirationTime to set */
296     public void setExpirationTime(long expirationTime) {
297       this.expirationTime = expirationTime;
298     }
299   }
300 }