View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
25  import org.apache.hadoop.hbase.util.HasThread;
26  
27  import java.util.ConcurrentModificationException;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Delayed;
32  import java.util.concurrent.TimeUnit;
33  
34  import java.io.IOException;
35  
36  /**
37   * Leases
38   *
39   * There are several server classes in HBase that need to track external
40   * clients that occasionally send heartbeats.
41   *
42   * <p>These external clients hold resources in the server class.
43   * Those resources need to be released if the external client fails to send a
44   * heartbeat after some interval of time passes.
45   *
46   * <p>The Leases class is a general reusable class for this kind of pattern.
47   * An instance of the Leases class will create a thread to do its dirty work.
48   * You should close() the instance if you want to clean up the thread properly.
49   *
50   * <p>
51   * NOTE: This class extends Thread rather than Chore because the sleep time
52   * can be interrupted when there is something to do, rather than the Chore
53   * sleep time which is invariant.
54   */
55  @InterfaceAudience.Private
56  public class Leases extends HasThread {
57    private static final Log LOG = LogFactory.getLog(Leases.class.getName());
58    public static final int MIN_WAIT_TIME = 100;
59    private final Map<String, Lease> leases = new ConcurrentHashMap<String, Lease>();
60  
61    protected final int leaseCheckFrequency;
62    protected volatile boolean stopRequested = false;
63  
64    /**
65     * Creates a lease monitor
66     * 
67     * @param leaseCheckFrequency - how often the lease should be checked
68     *          (milliseconds)
69     */
70    public Leases(final int leaseCheckFrequency) {
71      this.leaseCheckFrequency = leaseCheckFrequency;
72      setDaemon(true);
73    }
74  
75    /**
76     * @see Thread#run()
77     */
78    @Override
79    public void run() {
80      long toWait = leaseCheckFrequency;
81      Lease nextLease = null;
82      long nextLeaseDelay = Long.MAX_VALUE;
83  
84      while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
85  
86        try {
87          if (nextLease != null) {
88            toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
89          }
90  
91          toWait = Math.min(leaseCheckFrequency, toWait);
92          toWait = Math.max(MIN_WAIT_TIME, toWait);
93  
94          Thread.sleep(toWait);
95        } catch (InterruptedException e) {
96          continue;
97        } catch (ConcurrentModificationException e) {
98          continue;
99        } catch (Throwable e) {
100         LOG.fatal("Unexpected exception killed leases thread", e);
101         break;
102       }
103 
104       nextLease = null;
105       nextLeaseDelay = Long.MAX_VALUE;
106       for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
107         Map.Entry<String, Lease> entry = it.next();
108         Lease lease = entry.getValue();
109         long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
110         if ( thisLeaseDelay > 0) {
111           if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
112             nextLease = lease;
113             nextLeaseDelay = thisLeaseDelay;
114           }
115         } else {
116           // A lease expired.  Run the expired code before removing from map
117           // since its presence in map is used to see if lease exists still.
118           if (lease.getListener() == null) {
119             LOG.error("lease listener is null for lease " + lease.getLeaseName());
120           } else {
121             lease.getListener().leaseExpired();
122           }
123           it.remove();
124         }
125       }
126     }
127     close();
128   }
129 
130   /**
131    * Shuts down this lease instance when all outstanding leases expire.
132    * Like {@link #close()} but rather than violently end all leases, waits
133    * first on extant leases to finish.  Use this method if the lease holders
134    * could lose data, leak locks, etc.  Presumes client has shutdown
135    * allocation of new leases.
136    */
137   public void closeAfterLeasesExpire() {
138     this.stopRequested = true;
139   }
140 
141   /**
142    * Shut down this Leases instance.  All pending leases will be destroyed,
143    * without any cancellation calls.
144    */
145   public void close() {
146     LOG.info(Thread.currentThread().getName() + " closing leases");
147     this.stopRequested = true;
148     leases.clear();
149     LOG.info(Thread.currentThread().getName() + " closed leases");
150   }
151 
152   /**
153    * Create a lease and insert it to the map of leases.
154    *
155    * @param leaseName name of the lease
156    * @param leaseTimeoutPeriod length of the lease in milliseconds
157    * @param listener listener that will process lease expirations
158    * @throws LeaseStillHeldException
159    */
160   public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
161       throws LeaseStillHeldException {
162     addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
163   }
164 
165   /**
166    * Inserts lease.  Resets expiration before insertion.
167    * @param lease
168    * @throws LeaseStillHeldException
169    */
170   public void addLease(final Lease lease) throws LeaseStillHeldException {
171     if (this.stopRequested) {
172       return;
173     }
174     if (leases.containsKey(lease.getLeaseName())) {
175       throw new LeaseStillHeldException(lease.getLeaseName());
176     }
177     lease.resetExpirationTime();
178     leases.put(lease.getLeaseName(), lease);
179   }
180 
181   /**
182    * Renew a lease
183    *
184    * @param leaseName name of lease
185    * @throws LeaseException
186    */
187   public void renewLease(final String leaseName) throws LeaseException {
188     if (this.stopRequested) {
189       return;
190     }
191     Lease lease = leases.get(leaseName);
192 
193     if (lease == null ) {
194       throw new LeaseException("lease '" + leaseName +
195           "' does not exist or has already expired");
196     }
197     lease.resetExpirationTime();
198   }
199 
200   /**
201    * Client explicitly cancels a lease.
202    * @param leaseName name of lease
203    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
204    */
205   public void cancelLease(final String leaseName) throws LeaseException {
206     removeLease(leaseName);
207   }
208 
209   /**
210    * Remove named lease.
211    * Lease is removed from the map of leases.
212    * Lease can be reinserted using {@link #addLease(Lease)}
213    *
214    * @param leaseName name of lease
215    * @throws org.apache.hadoop.hbase.regionserver.LeaseException
216    * @return Removed lease
217    */
218   Lease removeLease(final String leaseName) throws LeaseException {
219     Lease lease = leases.remove(leaseName);
220     if (lease == null) {
221       throw new LeaseException("lease '" + leaseName + "' does not exist");
222     }
223     return lease;
224   }
225 
226   /**
227    * Thrown if we are asked to create a lease but lease on passed name already
228    * exists.
229    */
230   @SuppressWarnings("serial")
231   public static class LeaseStillHeldException extends IOException {
232     private final String leaseName;
233 
234     /**
235      * @param name
236      */
237     public LeaseStillHeldException(final String name) {
238       this.leaseName = name;
239     }
240 
241     /** @return name of lease */
242     public String getName() {
243       return this.leaseName;
244     }
245   }
246 
247   /** This class tracks a single Lease. */
248   static class Lease implements Delayed {
249     private final String leaseName;
250     private final LeaseListener listener;
251     private int leaseTimeoutPeriod;
252     private long expirationTime;
253 
254     Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
255       this.leaseName = leaseName;
256       this.listener = listener;
257       this.leaseTimeoutPeriod = leaseTimeoutPeriod;
258       this.expirationTime = 0;
259     }
260 
261     /** @return the lease name */
262     public String getLeaseName() {
263       return leaseName;
264     }
265 
266     /** @return listener */
267     public LeaseListener getListener() {
268       return this.listener;
269     }
270 
271     @Override
272     public boolean equals(Object obj) {
273       if (this == obj) {
274         return true;
275       }
276       if (obj == null) {
277         return false;
278       }
279       if (getClass() != obj.getClass()) {
280         return false;
281       }
282       return this.hashCode() == obj.hashCode();
283     }
284 
285     @Override
286     public int hashCode() {
287       return this.leaseName.hashCode();
288     }
289 
290     public long getDelay(TimeUnit unit) {
291       return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
292           TimeUnit.MILLISECONDS);
293     }
294 
295     public int compareTo(Delayed o) {
296       long delta = this.getDelay(TimeUnit.MILLISECONDS) -
297         o.getDelay(TimeUnit.MILLISECONDS);
298 
299       return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
300     }
301 
302     /**
303      * Resets the expiration time of the lease.
304      */
305     public void resetExpirationTime() {
306       this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
307     }
308   }
309 }