1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27
28 import com.google.common.annotations.VisibleForTesting;
29
30
31
32
33
34
35
36
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class IdLock {
44
45
46 public static class Entry {
47 private final long id;
48 private int numWaiters;
49 private boolean isLocked = true;
50
51 private Entry(long id) {
52 this.id = id;
53 }
54
55 public String toString() {
56 return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
57 + isLocked;
58 }
59 }
60
61 private ConcurrentMap<Long, Entry> map =
62 new ConcurrentHashMap<Long, Entry>();
63
64
65
66
67
68
69
70
71
72 public Entry getLockEntry(long id) throws IOException {
73 Entry entry = new Entry(id);
74 Entry existing;
75 while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
76 synchronized (existing) {
77 if (existing.isLocked) {
78 ++existing.numWaiters;
79 while (existing.isLocked) {
80 try {
81 existing.wait();
82 } catch (InterruptedException e) {
83 --existing.numWaiters;
84 throw new InterruptedIOException(
85 "Interrupted waiting to acquire sparse lock");
86 }
87 }
88
89 --existing.numWaiters;
90 existing.isLocked = true;
91 return existing;
92 }
93
94
95
96 }
97 }
98 return entry;
99 }
100
101
102
103
104
105
106
107
108 public void releaseLockEntry(Entry entry) {
109 synchronized (entry) {
110 entry.isLocked = false;
111 if (entry.numWaiters > 0) {
112 entry.notify();
113 } else {
114 map.remove(entry.id);
115 }
116 }
117 }
118
119
120 void assertMapEmpty() {
121 assert map.size() == 0;
122 }
123
124 @VisibleForTesting
125 public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
126 for (Entry entry;;) {
127 entry = map.get(id);
128 if (entry != null) {
129 synchronized (entry) {
130 if (entry.numWaiters >= numWaiters) {
131 return;
132 }
133 }
134 }
135 Thread.sleep(100);
136 }
137 }
138 }