001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.ConcurrentModificationException;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.Delayed;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.hadoop.hbase.log.HBaseMarkers;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.HasThread;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Leases
038 *
039 * There are several server classes in HBase that need to track external
040 * clients that occasionally send heartbeats.
041 *
042 * <p>These external clients hold resources in the server class.
043 * Those resources need to be released if the external client fails to send a
044 * heartbeat after some interval of time passes.
045 *
046 * <p>The Leases class is a general reusable class for this kind of pattern.
047 * An instance of the Leases class will create a thread to do its dirty work.
048 * You should close() the instance if you want to clean up the thread properly.
049 *
050 * <p>
051 * NOTE: This class extends Thread rather than Chore because the sleep time
052 * can be interrupted when there is something to do, rather than the Chore
053 * sleep time which is invariant.
054 */
055@InterfaceAudience.Private
056public class Leases extends HasThread {
057  private static final Logger LOG = LoggerFactory.getLogger(Leases.class.getName());
058  public static final int MIN_WAIT_TIME = 100;
059  private final Map<String, Lease> leases = new ConcurrentHashMap<>();
060
061  protected final int leaseCheckFrequency;
062  protected volatile boolean stopRequested = false;
063
064  /**
065   * Creates a lease monitor
066   *
067   * @param leaseCheckFrequency - how often the lease should be checked
068   *          (milliseconds)
069   */
070  public Leases(final int leaseCheckFrequency) {
071    super("RegionServerLeases");  // thread name
072    this.leaseCheckFrequency = leaseCheckFrequency;
073    setDaemon(true);
074  }
075
076  /**
077   * @see Thread#run()
078   */
079  @Override
080  public void run() {
081    long toWait = leaseCheckFrequency;
082    Lease nextLease = null;
083    long nextLeaseDelay = Long.MAX_VALUE;
084
085    while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
086
087      try {
088        if (nextLease != null) {
089          toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
090        }
091
092        toWait = Math.min(leaseCheckFrequency, toWait);
093        toWait = Math.max(MIN_WAIT_TIME, toWait);
094
095        Thread.sleep(toWait);
096      } catch (InterruptedException e) {
097        continue;
098      } catch (ConcurrentModificationException e) {
099        continue;
100      } catch (Throwable e) {
101        LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e);
102        break;
103      }
104
105      nextLease = null;
106      nextLeaseDelay = Long.MAX_VALUE;
107      for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
108        Map.Entry<String, Lease> entry = it.next();
109        Lease lease = entry.getValue();
110        long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
111        if ( thisLeaseDelay > 0) {
112          if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
113            nextLease = lease;
114            nextLeaseDelay = thisLeaseDelay;
115          }
116        } else {
117          // A lease expired.  Run the expired code before removing from map
118          // since its presence in map is used to see if lease exists still.
119          if (lease.getListener() == null) {
120            LOG.error("lease listener is null for lease " + lease.getLeaseName());
121          } else {
122            lease.getListener().leaseExpired();
123          }
124          it.remove();
125        }
126      }
127    }
128    close();
129  }
130
131  /**
132   * Shuts down this lease instance when all outstanding leases expire.
133   * Like {@link #close()} but rather than violently end all leases, waits
134   * first on extant leases to finish.  Use this method if the lease holders
135   * could lose data, leak locks, etc.  Presumes client has shutdown
136   * allocation of new leases.
137   */
138  public void closeAfterLeasesExpire() {
139    this.stopRequested = true;
140  }
141
142  /**
143   * Shut down this Leases instance.  All pending leases will be destroyed,
144   * without any cancellation calls.
145   */
146  public void close() {
147    this.stopRequested = true;
148    leases.clear();
149    LOG.info("Closed leases");
150  }
151
152  /**
153   * Create a lease and insert it to the map of leases.
154   *
155   * @param leaseName name of the lease
156   * @param leaseTimeoutPeriod length of the lease in milliseconds
157   * @param listener listener that will process lease expirations
158   * @return The lease created.
159   * @throws LeaseStillHeldException
160   */
161  public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
162      throws LeaseStillHeldException {
163    Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
164    addLease(lease);
165    return lease;
166  }
167
168  /**
169   * Inserts lease.  Resets expiration before insertion.
170   * @param lease
171   * @throws LeaseStillHeldException
172   */
173  public void addLease(final Lease lease) throws LeaseStillHeldException {
174    if (this.stopRequested) {
175      return;
176    }
177    if (leases.containsKey(lease.getLeaseName())) {
178      throw new LeaseStillHeldException(lease.getLeaseName());
179    }
180    lease.resetExpirationTime();
181    leases.put(lease.getLeaseName(), lease);
182  }
183
184  /**
185   * Renew a lease
186   *
187   * @param leaseName name of lease
188   * @throws LeaseException
189   */
190  public void renewLease(final String leaseName) throws LeaseException {
191    if (this.stopRequested) {
192      return;
193    }
194    Lease lease = leases.get(leaseName);
195
196    if (lease == null ) {
197      throw new LeaseException("lease '" + leaseName +
198          "' does not exist or has already expired");
199    }
200    lease.resetExpirationTime();
201  }
202
203  /**
204   * Client explicitly cancels a lease.
205   * @param leaseName name of lease
206   * @throws org.apache.hadoop.hbase.regionserver.LeaseException
207   */
208  public void cancelLease(final String leaseName) throws LeaseException {
209    removeLease(leaseName);
210  }
211
212  /**
213   * Remove named lease.
214   * Lease is removed from the map of leases.
215   * Lease can be reinserted using {@link #addLease(Lease)}
216   *
217   * @param leaseName name of lease
218   * @throws org.apache.hadoop.hbase.regionserver.LeaseException
219   * @return Removed lease
220   */
221  Lease removeLease(final String leaseName) throws LeaseException {
222    Lease lease = leases.remove(leaseName);
223    if (lease == null) {
224      throw new LeaseException("lease '" + leaseName + "' does not exist");
225    }
226    return lease;
227  }
228
229  /**
230   * Thrown if we are asked to create a lease but lease on passed name already
231   * exists.
232   */
233  @SuppressWarnings("serial")
234  public static class LeaseStillHeldException extends IOException {
235    private final String leaseName;
236
237    /**
238     * @param name
239     */
240    public LeaseStillHeldException(final String name) {
241      this.leaseName = name;
242    }
243
244    /** @return name of lease */
245    public String getName() {
246      return this.leaseName;
247    }
248  }
249
250  /** This class tracks a single Lease. */
251  static class Lease implements Delayed {
252    private final String leaseName;
253    private final LeaseListener listener;
254    private int leaseTimeoutPeriod;
255    private long expirationTime;
256
257    Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
258      this.leaseName = leaseName;
259      this.listener = listener;
260      this.leaseTimeoutPeriod = leaseTimeoutPeriod;
261      this.expirationTime = 0;
262    }
263
264    /** @return the lease name */
265    public String getLeaseName() {
266      return leaseName;
267    }
268
269    /** @return listener */
270    public LeaseListener getListener() {
271      return this.listener;
272    }
273
274    @Override
275    public boolean equals(Object obj) {
276      if (this == obj) {
277        return true;
278      }
279      if (obj == null) {
280        return false;
281      }
282      if (getClass() != obj.getClass()) {
283        return false;
284      }
285      return this.hashCode() == obj.hashCode();
286    }
287
288    @Override
289    public int hashCode() {
290      return this.leaseName.hashCode();
291    }
292
293    @Override
294    public long getDelay(TimeUnit unit) {
295      return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
296          TimeUnit.MILLISECONDS);
297    }
298
299    @Override
300    public int compareTo(Delayed o) {
301      long delta = this.getDelay(TimeUnit.MILLISECONDS) -
302        o.getDelay(TimeUnit.MILLISECONDS);
303
304      return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
305    }
306
307    /**
308     * Resets the expiration time of the lease.
309     */
310    public void resetExpirationTime() {
311      this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
312    }
313  }
314}