1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.util;
20  
21  
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.Set;
25  import java.util.SortedSet;
26  import java.util.TreeSet;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  @InterfaceAudience.Private
53  public class KeyLocker<K extends Comparable<? super K>> {
54    private static final Log LOG = LogFactory.getLog(KeyLocker.class);
55  
56    
57    private static final int NB_CONCURRENT_LOCKS = 1000;
58  
59    
60    
61    private final Map<K, Pair<KeyLock<K>, AtomicInteger>> locks =
62      new HashMap<K, Pair<KeyLock<K>, AtomicInteger>>(NB_CONCURRENT_LOCKS);
63  
64    
65  
66  
67  
68  
69    public ReentrantLock acquireLock(K key) {
70      if (key == null) throw new IllegalArgumentException("key must not be null");
71  
72      Pair<KeyLock<K>, AtomicInteger> lock;
73      synchronized (this) {
74        lock = locks.get(key);
75        if (lock == null) {
76          lock = new Pair<KeyLock<K>, AtomicInteger>(
77            new KeyLock<K>(this, key), new AtomicInteger(1));
78          locks.put(key, lock);
79        } else {
80          lock.getSecond().incrementAndGet();
81        }
82      }
83      lock.getFirst().lock();
84      return lock.getFirst();
85    }
86  
87    
88  
89  
90  
91    public Map<K, Lock> acquireLocks(final Set<K> keys) {
92      Map<K, Lock> locks = new HashMap<K, Lock>(keys.size());
93      SortedSet<K> sortedKeys = new TreeSet<K>(keys);
94      for (K key : sortedKeys) {
95        locks.put(key, acquireLock(key));
96      }
97      return locks;
98    }
99  
100   
101 
102 
103   private synchronized void releaseLock(K key) {
104     Pair<KeyLock<K>, AtomicInteger> lock = locks.get(key);
105     if (lock != null) {
106       if (lock.getSecond().decrementAndGet() == 0) {
107         locks.remove(key);
108       }
109     } else {
110       String message = "Can't release the lock for " + key+", this key is not in the key list." +
111         " known keys are: "+ locks.keySet();
112       LOG.error(message);
113       throw new RuntimeException(message);
114     }
115   }
116 
117   static class KeyLock<K extends Comparable<? super K>> extends ReentrantLock {
118     private static final long serialVersionUID = -12432857283423584L;
119 
120     private final transient KeyLocker<K> locker;
121     private final K lockId;
122 
123     private KeyLock(KeyLocker<K> locker, K lockId) {
124       super();
125       this.locker = locker;
126       this.lockId = lockId;
127     }
128 
129     @Override
130     public void unlock() {
131       super.unlock();
132       locker.releaseLock(lockId);
133     }
134   }
135 }