001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.io.IOException;
022import java.util.ConcurrentModificationException;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.Delayed;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.hadoop.hbase.log.HBaseMarkers;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.HasThread;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * Leases
038 *
039 * There are several server classes in HBase that need to track external
040 * clients that occasionally send heartbeats.
041 *
042 * <p>These external clients hold resources in the server class.
043 * Those resources need to be released if the external client fails to send a
044 * heartbeat after some interval of time passes.
045 *
046 * <p>The Leases class is a general reusable class for this kind of pattern.
047 * An instance of the Leases class will create a thread to do its dirty work.
048 * You should close() the instance if you want to clean up the thread properly.
049 *
050 * <p>
051 * NOTE: This class extends Thread rather than Chore because the sleep time
052 * can be interrupted when there is something to do, rather than the Chore
053 * sleep time which is invariant.
054 */
055@InterfaceAudience.Private
056public class LeaseManager extends HasThread {
057  private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class.getName());
058  private static final int MIN_WAIT_TIME = 100;
059
060  private final Map<String, Lease> leases = new ConcurrentHashMap<>();
061  private final int leaseCheckFrequency;
062  private volatile boolean stopRequested = false;
063
064  /**
065   * Creates a lease manager.
066   *
067   * @param leaseCheckFrequency - how often the lease should be checked (milliseconds)
068   */
069  public LeaseManager(final int leaseCheckFrequency) {
070    super("RegionServer.LeaseManager");  // thread name
071    this.leaseCheckFrequency = leaseCheckFrequency;
072    setDaemon(true);
073  }
074
075  @Override
076  public void run() {
077    long toWait = leaseCheckFrequency;
078    Lease nextLease = null;
079    long nextLeaseDelay = Long.MAX_VALUE;
080
081    while (!stopRequested || (stopRequested && !leases.isEmpty()) ) {
082
083      try {
084        if (nextLease != null) {
085          toWait = nextLease.getDelay(TimeUnit.MILLISECONDS);
086        }
087
088        toWait = Math.min(leaseCheckFrequency, toWait);
089        toWait = Math.max(MIN_WAIT_TIME, toWait);
090
091        Thread.sleep(toWait);
092      } catch (InterruptedException | ConcurrentModificationException e) {
093        continue;
094      } catch (Throwable e) {
095        LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e);
096        break;
097      }
098
099      nextLease = null;
100      nextLeaseDelay = Long.MAX_VALUE;
101      for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) {
102        Map.Entry<String, Lease> entry = it.next();
103        Lease lease = entry.getValue();
104        long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS);
105        if ( thisLeaseDelay > 0) {
106          if (nextLease == null || thisLeaseDelay < nextLeaseDelay) {
107            nextLease = lease;
108            nextLeaseDelay = thisLeaseDelay;
109          }
110        } else {
111          // A lease expired.  Run the expired code before removing from map
112          // since its presence in map is used to see if lease exists still.
113          if (lease.getListener() == null) {
114            LOG.error("lease listener is null for lease " + lease.getLeaseName());
115          } else {
116            lease.getListener().leaseExpired();
117          }
118          it.remove();
119        }
120      }
121    }
122    close();
123  }
124
125  /**
126   * Shuts down this lease instance when all outstanding leases expire.
127   * Like {@link #close()} but rather than violently end all leases, waits
128   * first on extant leases to finish.  Use this method if the lease holders
129   * could lose data, leak locks, etc.  Presumes client has shutdown
130   * allocation of new leases.
131   */
132  public void closeAfterLeasesExpire() {
133    this.stopRequested = true;
134  }
135
136  /**
137   * Shut down this Leases instance.  All pending leases will be destroyed,
138   * without any cancellation calls.
139   */
140  public void close() {
141    this.stopRequested = true;
142    leases.clear();
143    LOG.info("Closed leases");
144  }
145
146  /**
147   * Create a lease and insert it to the map of leases.
148   *
149   * @param leaseName name of the lease
150   * @param leaseTimeoutPeriod length of the lease in milliseconds
151   * @param listener listener that will process lease expirations
152   * @return The lease created.
153   */
154  public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
155      throws LeaseStillHeldException {
156    Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
157    addLease(lease);
158    return lease;
159  }
160
161  /**
162   * Inserts lease.  Resets expiration before insertion.
163   */
164  public void addLease(final Lease lease) throws LeaseStillHeldException {
165    if (this.stopRequested) {
166      return;
167    }
168    if (leases.containsKey(lease.getLeaseName())) {
169      throw new LeaseStillHeldException(lease.getLeaseName());
170    }
171    lease.resetExpirationTime();
172    leases.put(lease.getLeaseName(), lease);
173  }
174
175  /**
176   * Renew a lease
177   *
178   * @param leaseName name of the lease
179   */
180  public void renewLease(final String leaseName) throws LeaseException {
181    if (this.stopRequested) {
182      return;
183    }
184    Lease lease = leases.get(leaseName);
185
186    if (lease == null ) {
187      throw new LeaseException("lease '" + leaseName +
188          "' does not exist or has already expired");
189    }
190    lease.resetExpirationTime();
191  }
192
193  /**
194   * Client explicitly cancels a lease.
195   *
196   * @param leaseName name of lease
197   */
198  public void cancelLease(final String leaseName) throws LeaseException {
199    removeLease(leaseName);
200  }
201
202  /**
203   * Remove named lease. Lease is removed from the map of leases.
204   *
205   * @param leaseName name of lease
206   * @return Removed lease
207   */
208  Lease removeLease(final String leaseName) throws LeaseException {
209    Lease lease = leases.remove(leaseName);
210    if (lease == null) {
211      throw new LeaseException("lease '" + leaseName + "' does not exist");
212    }
213    return lease;
214  }
215
216  /**
217   * Thrown if we are asked to create a lease but lease on passed name already
218   * exists.
219   */
220  @SuppressWarnings("serial")
221  public static class LeaseStillHeldException extends IOException {
222    private final String leaseName;
223
224    public LeaseStillHeldException(final String name) {
225      this.leaseName = name;
226    }
227
228    /** @return name of lease */
229    public String getName() {
230      return this.leaseName;
231    }
232  }
233
234  /** This class tracks a single Lease. */
235  static class Lease implements Delayed {
236    private final String leaseName;
237    private final LeaseListener listener;
238    private int leaseTimeoutPeriod;
239    private long expirationTime;
240
241    Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) {
242      this.leaseName = leaseName;
243      this.listener = listener;
244      this.leaseTimeoutPeriod = leaseTimeoutPeriod;
245      this.expirationTime = 0;
246    }
247
248    /** @return the lease name */
249    public String getLeaseName() {
250      return leaseName;
251    }
252
253    /** @return listener */
254    public LeaseListener getListener() {
255      return this.listener;
256    }
257
258    @Override
259    public boolean equals(Object obj) {
260      if (this == obj) {
261        return true;
262      }
263      if (obj == null) {
264        return false;
265      }
266      if (getClass() != obj.getClass()) {
267        return false;
268      }
269      return this.hashCode() == obj.hashCode();
270    }
271
272    @Override
273    public int hashCode() {
274      return this.leaseName.hashCode();
275    }
276
277    @Override
278    public long getDelay(TimeUnit unit) {
279      return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(),
280          TimeUnit.MILLISECONDS);
281    }
282
283    @Override
284    public int compareTo(Delayed o) {
285      long delta = this.getDelay(TimeUnit.MILLISECONDS) -
286        o.getDelay(TimeUnit.MILLISECONDS);
287
288      return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
289    }
290
291    /**
292     * Resets the expiration time of the lease.
293     */
294    public void resetExpirationTime() {
295      this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod;
296    }
297  }
298}