001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.IOException; 022import java.util.ConcurrentModificationException; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.Delayed; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.hadoop.hbase.log.HBaseMarkers; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Leases 037 * 038 * There are several server classes in HBase that need to track external 039 * clients that occasionally send heartbeats. 040 * 041 * <p>These external clients hold resources in the server class. 042 * Those resources need to be released if the external client fails to send a 043 * heartbeat after some interval of time passes. 044 * 045 * <p>The Leases class is a general reusable class for this kind of pattern. 046 * An instance of the Leases class will create a thread to do its dirty work. 047 * You should close() the instance if you want to clean up the thread properly. 048 * 049 * <p> 050 * NOTE: This class extends Thread rather than Chore because the sleep time 051 * can be interrupted when there is something to do, rather than the Chore 052 * sleep time which is invariant. 053 */ 054@InterfaceAudience.Private 055public class LeaseManager extends Thread { 056 private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class.getName()); 057 private static final int MIN_WAIT_TIME = 100; 058 059 private final Map<String, Lease> leases = new ConcurrentHashMap<>(); 060 private final int leaseCheckFrequency; 061 private volatile boolean stopRequested = false; 062 063 /** 064 * Creates a lease manager. 065 * 066 * @param leaseCheckFrequency - how often the lease should be checked (milliseconds) 067 */ 068 public LeaseManager(final int leaseCheckFrequency) { 069 super("RegionServer.LeaseManager"); // thread name 070 this.leaseCheckFrequency = leaseCheckFrequency; 071 setDaemon(true); 072 } 073 074 @Override 075 public void run() { 076 long toWait = leaseCheckFrequency; 077 Lease nextLease = null; 078 long nextLeaseDelay = Long.MAX_VALUE; 079 080 while (!stopRequested || (stopRequested && !leases.isEmpty()) ) { 081 082 try { 083 if (nextLease != null) { 084 toWait = nextLease.getDelay(TimeUnit.MILLISECONDS); 085 } 086 087 toWait = Math.min(leaseCheckFrequency, toWait); 088 toWait = Math.max(MIN_WAIT_TIME, toWait); 089 090 Thread.sleep(toWait); 091 } catch (InterruptedException | ConcurrentModificationException e) { 092 continue; 093 } catch (Throwable e) { 094 LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e); 095 break; 096 } 097 098 nextLease = null; 099 nextLeaseDelay = Long.MAX_VALUE; 100 for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) { 101 Map.Entry<String, Lease> entry = it.next(); 102 Lease lease = entry.getValue(); 103 long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS); 104 if ( thisLeaseDelay > 0) { 105 if (nextLease == null || thisLeaseDelay < nextLeaseDelay) { 106 nextLease = lease; 107 nextLeaseDelay = thisLeaseDelay; 108 } 109 } else { 110 // A lease expired. Run the expired code before removing from map 111 // since its presence in map is used to see if lease exists still. 112 if (lease.getListener() == null) { 113 LOG.error("lease listener is null for lease " + lease.getLeaseName()); 114 } else { 115 lease.getListener().leaseExpired(); 116 } 117 it.remove(); 118 } 119 } 120 } 121 close(); 122 } 123 124 /** 125 * Shuts down this lease instance when all outstanding leases expire. 126 * Like {@link #close()} but rather than violently end all leases, waits 127 * first on extant leases to finish. Use this method if the lease holders 128 * could lose data, leak locks, etc. Presumes client has shutdown 129 * allocation of new leases. 130 */ 131 public void closeAfterLeasesExpire() { 132 this.stopRequested = true; 133 } 134 135 /** 136 * Shut down this Leases instance. All pending leases will be destroyed, 137 * without any cancellation calls. 138 */ 139 public void close() { 140 this.stopRequested = true; 141 leases.clear(); 142 LOG.info("Closed leases"); 143 } 144 145 /** 146 * Create a lease and insert it to the map of leases. 147 * 148 * @param leaseName name of the lease 149 * @param leaseTimeoutPeriod length of the lease in milliseconds 150 * @param listener listener that will process lease expirations 151 * @return The lease created. 152 */ 153 public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener) 154 throws LeaseStillHeldException { 155 Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener); 156 addLease(lease); 157 return lease; 158 } 159 160 /** 161 * Inserts lease. Resets expiration before insertion. 162 */ 163 public void addLease(final Lease lease) throws LeaseStillHeldException { 164 if (this.stopRequested) { 165 return; 166 } 167 if (leases.containsKey(lease.getLeaseName())) { 168 throw new LeaseStillHeldException(lease.getLeaseName()); 169 } 170 lease.resetExpirationTime(); 171 leases.put(lease.getLeaseName(), lease); 172 } 173 174 /** 175 * Renew a lease 176 * 177 * @param leaseName name of the lease 178 */ 179 public void renewLease(final String leaseName) throws LeaseException { 180 if (this.stopRequested) { 181 return; 182 } 183 Lease lease = leases.get(leaseName); 184 185 if (lease == null ) { 186 throw new LeaseException("lease '" + leaseName + 187 "' does not exist or has already expired"); 188 } 189 lease.resetExpirationTime(); 190 } 191 192 /** 193 * Client explicitly cancels a lease. 194 * 195 * @param leaseName name of lease 196 */ 197 public void cancelLease(final String leaseName) throws LeaseException { 198 removeLease(leaseName); 199 } 200 201 /** 202 * Remove named lease. Lease is removed from the map of leases. 203 * 204 * @param leaseName name of lease 205 * @return Removed lease 206 */ 207 Lease removeLease(final String leaseName) throws LeaseException { 208 Lease lease = leases.remove(leaseName); 209 if (lease == null) { 210 throw new LeaseException("lease '" + leaseName + "' does not exist"); 211 } 212 return lease; 213 } 214 215 /** 216 * Thrown if we are asked to create a lease but lease on passed name already 217 * exists. 218 */ 219 @SuppressWarnings("serial") 220 public static class LeaseStillHeldException extends IOException { 221 private final String leaseName; 222 223 public LeaseStillHeldException(final String name) { 224 this.leaseName = name; 225 } 226 227 /** @return name of lease */ 228 public String getName() { 229 return this.leaseName; 230 } 231 } 232 233 /** This class tracks a single Lease. */ 234 static class Lease implements Delayed { 235 private final String leaseName; 236 private final LeaseListener listener; 237 private int leaseTimeoutPeriod; 238 private long expirationTime; 239 240 Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) { 241 this.leaseName = leaseName; 242 this.listener = listener; 243 this.leaseTimeoutPeriod = leaseTimeoutPeriod; 244 this.expirationTime = 0; 245 } 246 247 /** @return the lease name */ 248 public String getLeaseName() { 249 return leaseName; 250 } 251 252 /** @return listener */ 253 public LeaseListener getListener() { 254 return this.listener; 255 } 256 257 @Override 258 public boolean equals(Object obj) { 259 if (this == obj) { 260 return true; 261 } 262 if (obj == null) { 263 return false; 264 } 265 if (getClass() != obj.getClass()) { 266 return false; 267 } 268 return this.hashCode() == obj.hashCode(); 269 } 270 271 @Override 272 public int hashCode() { 273 return this.leaseName.hashCode(); 274 } 275 276 @Override 277 public long getDelay(TimeUnit unit) { 278 return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(), 279 TimeUnit.MILLISECONDS); 280 } 281 282 @Override 283 public int compareTo(Delayed o) { 284 long delta = this.getDelay(TimeUnit.MILLISECONDS) - 285 o.getDelay(TimeUnit.MILLISECONDS); 286 287 return this.equals(o) ? 0 : (delta > 0 ? 1 : -1); 288 } 289 290 /** 291 * Resets the expiration time of the lease. 292 */ 293 public void resetExpirationTime() { 294 this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod; 295 } 296 } 297}