1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.util.LinkedList;
22 import java.util.concurrent.atomic.AtomicLong;
23
24 import com.google.common.annotations.VisibleForTesting;
25
26 import com.google.common.base.Objects;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.hbase.util.ClassSize;
32
33
34
35
36
37
38
39 @InterfaceAudience.Private
40 public class MultiVersionConcurrencyControl {
41 private static final Log LOG = LogFactory.getLog(MultiVersionConcurrencyControl.class);
42 static final long NO_WRITE_NUMBER = 0;
43
44 final AtomicLong readPoint = new AtomicLong(0);
45 final AtomicLong writePoint = new AtomicLong(0);
46 private final Object readWaiters = new Object();
47
48
49
50 public static final long NONE = -1;
51
52
53
54
55
56
57
58 private final LinkedList<WriteEntry> writeQueue = new LinkedList<WriteEntry>();
59
60 public MultiVersionConcurrencyControl() {
61 super();
62 }
63
64
65
66
67 public MultiVersionConcurrencyControl(long startPoint) {
68 tryAdvanceTo(startPoint, NONE);
69 }
70
71
72
73
74
75 public void advanceTo(long newStartPoint) {
76 while (true) {
77 long seqId = this.getWritePoint();
78 if (seqId >= newStartPoint) break;
79 if (this.tryAdvanceTo(
80 }
81 }
82
83
84
85
86
87
88
89
90
91 boolean tryAdvanceTo(long newStartPoint, long expected) {
92 synchronized (writeQueue) {
93 long currentRead = this.readPoint.get();
94 long currentWrite = this.writePoint.get();
95 if (currentRead != currentWrite) {
96 throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
97 ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
98 }
99 if (expected != NONE && expected != currentRead) {
100 return false;
101 }
102
103 if (newStartPoint < currentRead) {
104 return false;
105 }
106
107 readPoint.set(newStartPoint);
108 writePoint.set(newStartPoint);
109 }
110 return true;
111 }
112
113
114
115
116
117
118
119
120
121
122
123 public WriteEntry begin() {
124 synchronized (writeQueue) {
125 long nextWriteNumber = writePoint.incrementAndGet();
126 WriteEntry e = new WriteEntry(nextWriteNumber);
127 writeQueue.add(e);
128 return e;
129 }
130 }
131
132
133
134
135
136 public void await() {
137
138 completeAndWait(begin());
139 }
140
141
142
143
144
145
146
147
148 public void completeAndWait(WriteEntry e) {
149 complete(e);
150 waitForRead(e);
151 }
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 public boolean complete(WriteEntry writeEntry) {
168 synchronized (writeQueue) {
169 writeEntry.markCompleted();
170
171 long nextReadValue = NONE;
172 boolean ranOnce = false;
173 while (!writeQueue.isEmpty()) {
174 ranOnce = true;
175 WriteEntry queueFirst = writeQueue.getFirst();
176
177 if (nextReadValue > 0) {
178 if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
179 throw new RuntimeException("Invariant in complete violated, nextReadValue="
180 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
181 }
182 }
183
184 if (queueFirst.isCompleted()) {
185 nextReadValue = queueFirst.getWriteNumber();
186 writeQueue.removeFirst();
187 } else {
188 break;
189 }
190 }
191
192 if (!ranOnce) {
193 throw new RuntimeException("There is no first!");
194 }
195
196 if (nextReadValue > 0) {
197 synchronized (readWaiters) {
198 readPoint.set(nextReadValue);
199 readWaiters.notifyAll();
200 }
201 }
202 return readPoint.get() >= writeEntry.getWriteNumber();
203 }
204 }
205
206
207
208
209 void waitForRead(WriteEntry e) {
210 boolean interrupted = false;
211 int count = 0;
212 synchronized (readWaiters) {
213 while (readPoint.get() < e.getWriteNumber()) {
214 if (count % 100 == 0 && count > 0) {
215 LOG.warn("STUCK: " + this);
216 }
217 count++;
218 try {
219 readWaiters.wait(10);
220 } catch (InterruptedException ie) {
221
222
223 interrupted = true;
224 }
225 }
226 }
227 if (interrupted) {
228 Thread.currentThread().interrupt();
229 }
230 }
231
232 @VisibleForTesting
233 public String toString() {
234 return Objects.toStringHelper(this)
235 .add("readPoint", readPoint)
236 .add("writePoint", writePoint).toString();
237 }
238
239 public long getReadPoint() {
240 return readPoint.get();
241 }
242
243 @VisibleForTesting
244 public long getWritePoint() {
245 return writePoint.get();
246 }
247
248
249
250
251
252 @InterfaceAudience.Private
253 public static class WriteEntry {
254 private final long writeNumber;
255 private boolean completed = false;
256
257 WriteEntry(long writeNumber) {
258 this.writeNumber = writeNumber;
259 }
260
261 void markCompleted() {
262 this.completed = true;
263 }
264
265 boolean isCompleted() {
266 return this.completed;
267 }
268
269 public long getWriteNumber() {
270 return this.writeNumber;
271 }
272
273 @Override
274 public String toString() {
275 return this.writeNumber + ", " + this.completed;
276 }
277 }
278
279 public static final long FIXED_SIZE = ClassSize.align(
280 ClassSize.OBJECT +
281 2 * Bytes.SIZEOF_LONG +
282 2 * ClassSize.REFERENCE);
283 }