1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.SortedSet;
26 import java.util.TreeSet;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 @InterfaceAudience.Private
53 public class KeyLocker<K extends Comparable<? super K>> {
54 private static final Log LOG = LogFactory.getLog(KeyLocker.class);
55
56
57 private static final int NB_CONCURRENT_LOCKS = 1000;
58
59
60
61 private final Map<K, Pair<KeyLock<K>, AtomicInteger>> locks =
62 new HashMap<K, Pair<KeyLock<K>, AtomicInteger>>(NB_CONCURRENT_LOCKS);
63
64
65
66
67
68
69 public ReentrantLock acquireLock(K key) {
70 if (key == null) throw new IllegalArgumentException("key must not be null");
71
72 Pair<KeyLock<K>, AtomicInteger> lock;
73 synchronized (this) {
74 lock = locks.get(key);
75 if (lock == null) {
76 lock = new Pair<KeyLock<K>, AtomicInteger>(
77 new KeyLock<K>(this, key), new AtomicInteger(1));
78 locks.put(key, lock);
79 } else {
80 lock.getSecond().incrementAndGet();
81 }
82 }
83 lock.getFirst().lock();
84 return lock.getFirst();
85 }
86
87
88
89
90
91 public Map<K, Lock> acquireLocks(final Set<K> keys) {
92 Map<K, Lock> locks = new HashMap<K, Lock>(keys.size());
93 SortedSet<K> sortedKeys = new TreeSet<K>(keys);
94 for (K key : sortedKeys) {
95 locks.put(key, acquireLock(key));
96 }
97 return locks;
98 }
99
100
101
102
103 private synchronized void releaseLock(K key) {
104 Pair<KeyLock<K>, AtomicInteger> lock = locks.get(key);
105 if (lock != null) {
106 if (lock.getSecond().decrementAndGet() == 0) {
107 locks.remove(key);
108 }
109 } else {
110 String message = "Can't release the lock for " + key+", this key is not in the key list." +
111 " known keys are: "+ locks.keySet();
112 LOG.error(message);
113 throw new RuntimeException(message);
114 }
115 }
116
117 static class KeyLock<K extends Comparable<? super K>> extends ReentrantLock {
118 private static final long serialVersionUID = -12432857283423584L;
119
120 private final transient KeyLocker<K> locker;
121 private final K lockId;
122
123 private KeyLock(KeyLocker<K> locker, K lockId) {
124 super();
125 this.locker = locker;
126 this.lockId = lockId;
127 }
128
129 @Override
130 public void unlock() {
131 super.unlock();
132 locker.releaseLock(lockId);
133 }
134 }
135 }