001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.ConcurrentModificationException; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.Delayed; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.hbase.log.HBaseMarkers; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Leases There are several server classes in HBase that need to track external clients that 035 * occasionally send heartbeats. 036 * <p> 037 * These external clients hold resources in the server class. Those resources need to be released if 038 * the external client fails to send a heartbeat after some interval of time passes. 039 * <p> 040 * The Leases class is a general reusable class for this kind of pattern. An instance of the Leases 041 * class will create a thread to do its dirty work. You should close() the instance if you want to 042 * clean up the thread properly. 043 * <p> 044 * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when 045 * there is something to do, rather than the Chore sleep time which is invariant. 046 */ 047@InterfaceAudience.Private 048public class LeaseManager extends Thread { 049 private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class.getName()); 050 private static final int MIN_WAIT_TIME = 100; 051 052 private final Map<String, Lease> leases = new ConcurrentHashMap<>(); 053 private final int leaseCheckFrequency; 054 private volatile boolean stopRequested = false; 055 056 /** 057 * Creates a lease manager. 058 * @param leaseCheckFrequency - how often the lease should be checked (milliseconds) 059 */ 060 public LeaseManager(final int leaseCheckFrequency) { 061 super("RegionServer.LeaseManager"); // thread name 062 this.leaseCheckFrequency = leaseCheckFrequency; 063 setDaemon(true); 064 } 065 066 @Override 067 public void run() { 068 long toWait = leaseCheckFrequency; 069 Lease nextLease = null; 070 long nextLeaseDelay = Long.MAX_VALUE; 071 072 while (!stopRequested || (stopRequested && !leases.isEmpty())) { 073 074 try { 075 if (nextLease != null) { 076 toWait = nextLease.getDelay(TimeUnit.MILLISECONDS); 077 } 078 079 toWait = Math.min(leaseCheckFrequency, toWait); 080 toWait = Math.max(MIN_WAIT_TIME, toWait); 081 082 Thread.sleep(toWait); 083 } catch (InterruptedException | ConcurrentModificationException e) { 084 continue; 085 } catch (Throwable e) { 086 LOG.error(HBaseMarkers.FATAL, "Unexpected exception killed leases thread", e); 087 break; 088 } 089 090 nextLease = null; 091 nextLeaseDelay = Long.MAX_VALUE; 092 for (Iterator<Map.Entry<String, Lease>> it = leases.entrySet().iterator(); it.hasNext();) { 093 Map.Entry<String, Lease> entry = it.next(); 094 Lease lease = entry.getValue(); 095 long thisLeaseDelay = lease.getDelay(TimeUnit.MILLISECONDS); 096 if (thisLeaseDelay > 0) { 097 if (nextLease == null || thisLeaseDelay < nextLeaseDelay) { 098 nextLease = lease; 099 nextLeaseDelay = thisLeaseDelay; 100 } 101 } else { 102 // A lease expired. Run the expired code before removing from map 103 // since its presence in map is used to see if lease exists still. 104 if (lease.getListener() == null) { 105 LOG.error("lease listener is null for lease " + lease.getLeaseName()); 106 } else { 107 lease.getListener().leaseExpired(); 108 } 109 it.remove(); 110 } 111 } 112 } 113 close(); 114 } 115 116 /** 117 * Shuts down this lease instance when all outstanding leases expire. Like {@link #close()} but 118 * rather than violently end all leases, waits first on extant leases to finish. Use this method 119 * if the lease holders could lose data, leak locks, etc. Presumes client has shutdown allocation 120 * of new leases. 121 */ 122 public void closeAfterLeasesExpire() { 123 this.stopRequested = true; 124 } 125 126 /** 127 * Shut down this Leases instance. All pending leases will be destroyed, without any cancellation 128 * calls. 129 */ 130 public void close() { 131 this.stopRequested = true; 132 leases.clear(); 133 LOG.info("Closed leases"); 134 } 135 136 /** 137 * Create a lease and insert it to the map of leases. 138 * @param leaseName name of the lease 139 * @param leaseTimeoutPeriod length of the lease in milliseconds 140 * @param listener listener that will process lease expirations 141 * @return The lease created. 142 */ 143 public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener) 144 throws LeaseStillHeldException { 145 Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener); 146 addLease(lease); 147 return lease; 148 } 149 150 /** 151 * Inserts lease. Resets expiration before insertion. 152 */ 153 public void addLease(final Lease lease) throws LeaseStillHeldException { 154 if (this.stopRequested) { 155 return; 156 } 157 if (leases.containsKey(lease.getLeaseName())) { 158 throw new LeaseStillHeldException(lease.getLeaseName()); 159 } 160 lease.resetExpirationTime(); 161 leases.put(lease.getLeaseName(), lease); 162 } 163 164 /** 165 * Renew a lease 166 * @param leaseName name of the lease 167 */ 168 public void renewLease(final String leaseName) throws LeaseException { 169 if (this.stopRequested) { 170 return; 171 } 172 Lease lease = leases.get(leaseName); 173 174 if (lease == null) { 175 throw new LeaseException("lease '" + leaseName + "' does not exist or has already expired"); 176 } 177 lease.resetExpirationTime(); 178 } 179 180 /** 181 * Client explicitly cancels a lease. 182 * @param leaseName name of lease 183 */ 184 public void cancelLease(final String leaseName) throws LeaseException { 185 removeLease(leaseName); 186 } 187 188 /** 189 * Remove named lease. Lease is removed from the map of leases. 190 * @param leaseName name of lease 191 * @return Removed lease 192 */ 193 Lease removeLease(final String leaseName) throws LeaseException { 194 Lease lease = leases.remove(leaseName); 195 if (lease == null) { 196 throw new LeaseException("lease '" + leaseName + "' does not exist"); 197 } 198 return lease; 199 } 200 201 /** 202 * Thrown if we are asked to create a lease but lease on passed name already exists. 203 */ 204 @SuppressWarnings("serial") 205 public static class LeaseStillHeldException extends IOException { 206 private final String leaseName; 207 208 public LeaseStillHeldException(final String name) { 209 this.leaseName = name; 210 } 211 212 /** Returns name of lease */ 213 public String getName() { 214 return this.leaseName; 215 } 216 } 217 218 /** This class tracks a single Lease. */ 219 static class Lease implements Delayed { 220 private final String leaseName; 221 private final LeaseListener listener; 222 private int leaseTimeoutPeriod; 223 private long expirationTime; 224 225 Lease(final String leaseName, int leaseTimeoutPeriod, LeaseListener listener) { 226 this.leaseName = leaseName; 227 this.listener = listener; 228 this.leaseTimeoutPeriod = leaseTimeoutPeriod; 229 this.expirationTime = 0; 230 } 231 232 /** Returns the lease name */ 233 public String getLeaseName() { 234 return leaseName; 235 } 236 237 /** Returns listener */ 238 public LeaseListener getListener() { 239 return this.listener; 240 } 241 242 @Override 243 public boolean equals(Object obj) { 244 if (this == obj) { 245 return true; 246 } 247 if (obj == null) { 248 return false; 249 } 250 if (getClass() != obj.getClass()) { 251 return false; 252 } 253 return this.hashCode() == obj.hashCode(); 254 } 255 256 @Override 257 public int hashCode() { 258 return this.leaseName.hashCode(); 259 } 260 261 @Override 262 public long getDelay(TimeUnit unit) { 263 return unit.convert(this.expirationTime - EnvironmentEdgeManager.currentTime(), 264 TimeUnit.MILLISECONDS); 265 } 266 267 @Override 268 public int compareTo(Delayed o) { 269 long delta = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); 270 271 return this.equals(o) ? 0 : (delta > 0 ? 1 : -1); 272 } 273 274 /** 275 * Resets the expiration time of the lease. 276 */ 277 public void resetExpirationTime() { 278 this.expirationTime = EnvironmentEdgeManager.currentTime() + this.leaseTimeoutPeriod; 279 } 280 } 281}