1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.util.Iterator;
25 import java.util.Map;
26 import java.util.TreeMap;
27
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.hbase.classification.InterfaceStability;
30 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
31
32
33
34
35
36
37
38 @InterfaceAudience.Private
39 @InterfaceStability.Evolving
40 public class ProcedureStoreTracker {
41 private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>();
42
43 private boolean keepDeletes = false;
44 private boolean partial = false;
45
46 private long minUpdatedProcId = Long.MAX_VALUE;
47 private long maxUpdatedProcId = Long.MIN_VALUE;
48
49 public enum DeleteState { YES, NO, MAYBE }
50
51 public static class BitSetNode {
52 private final static long WORD_MASK = 0xffffffffffffffffL;
53 private final static int ADDRESS_BITS_PER_WORD = 6;
54 private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
55 private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
56
57 private final boolean partial;
58 private long[] updated;
59 private long[] deleted;
60 private long start;
61
62 public void dump() {
63 System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
64 getMinProcId(), getMaxProcId());
65 System.out.println("Update:");
66 for (int i = 0; i < updated.length; ++i) {
67 for (int j = 0; j < BITS_PER_WORD; ++j) {
68 System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0");
69 }
70 System.out.println(" " + i);
71 }
72 System.out.println();
73 System.out.println("Delete:");
74 for (int i = 0; i < deleted.length; ++i) {
75 for (int j = 0; j < BITS_PER_WORD; ++j) {
76 System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
77 }
78 System.out.println(" " + i);
79 }
80 System.out.println();
81 }
82
83 public BitSetNode(final long procId, final boolean partial) {
84 start = alignDown(procId);
85
86 int count = 1;
87 updated = new long[count];
88 deleted = new long[count];
89 for (int i = 0; i < count; ++i) {
90 updated[i] = 0;
91 deleted[i] = partial ? 0 : WORD_MASK;
92 }
93
94 this.partial = partial;
95 updateState(procId, false);
96 }
97
98 protected BitSetNode(final long start, final long[] updated, final long[] deleted) {
99 this.start = start;
100 this.updated = updated;
101 this.deleted = deleted;
102 this.partial = false;
103 }
104
105 public void update(final long procId) {
106 updateState(procId, false);
107 }
108
109 public void delete(final long procId) {
110 updateState(procId, true);
111 }
112
113 public Long getStart() {
114 return start;
115 }
116
117 public Long getEnd() {
118 return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
119 }
120
121 public boolean contains(final long procId) {
122 return start <= procId && procId <= getEnd();
123 }
124
125 public DeleteState isDeleted(final long procId) {
126 int bitmapIndex = getBitmapIndex(procId);
127 int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
128 if (wordIndex >= deleted.length) {
129 return DeleteState.MAYBE;
130 }
131 return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
132 }
133
134 private boolean isUpdated(final long procId) {
135 int bitmapIndex = getBitmapIndex(procId);
136 int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
137 if (wordIndex >= updated.length) {
138 return false;
139 }
140 return (updated[wordIndex] & (1L << bitmapIndex)) != 0;
141 }
142
143 public boolean isUpdated() {
144
145 for (int i = 0; i < updated.length; ++i) {
146 if ((updated[i] | deleted[i]) != WORD_MASK) {
147 return false;
148 }
149 }
150 return true;
151 }
152
153 public boolean isEmpty() {
154
155 for (int i = 0; i < deleted.length; ++i) {
156 if (deleted[i] != WORD_MASK) {
157 return false;
158 }
159 }
160 return true;
161 }
162
163 public void resetUpdates() {
164 for (int i = 0; i < updated.length; ++i) {
165 updated[i] = 0;
166 }
167 }
168
169 public void undeleteAll() {
170 for (int i = 0; i < updated.length; ++i) {
171 deleted[i] = 0;
172 }
173 }
174
175 public void unsetPartialFlag() {
176 for (int i = 0; i < updated.length; ++i) {
177 for (int j = 0; j < BITS_PER_WORD; ++j) {
178 if ((updated[i] & (1L << j)) == 0) {
179 deleted[i] |= (1L << j);
180 }
181 }
182 }
183 }
184
185 public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
186 ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
187 ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
188 builder.setStartId(start);
189 for (int i = 0; i < updated.length; ++i) {
190 builder.addUpdated(updated[i]);
191 builder.addDeleted(deleted[i]);
192 }
193 return builder.build();
194 }
195
196 public static BitSetNode convert(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
197 long start = data.getStartId();
198 int size = data.getUpdatedCount();
199 long[] updated = new long[size];
200 long[] deleted = new long[size];
201 for (int i = 0; i < size; ++i) {
202 updated[i] = data.getUpdated(i);
203 deleted[i] = data.getDeleted(i);
204 }
205 return new BitSetNode(start, updated, deleted);
206 }
207
208
209
210
211 public boolean canGrow(final long procId) {
212 return Math.abs(procId - start) < MAX_NODE_SIZE;
213 }
214
215 public boolean canMerge(final BitSetNode rightNode) {
216 assert start < rightNode.getEnd();
217 return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
218 }
219
220 public void grow(final long procId) {
221 int delta, offset;
222
223 if (procId < start) {
224
225 long newStart = alignDown(procId);
226 delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD;
227 offset = delta;
228 start = newStart;
229 } else {
230
231 long newEnd = alignUp(procId + 1);
232 delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
233 offset = 0;
234 }
235
236 long[] newBitmap;
237 int oldSize = updated.length;
238
239 newBitmap = new long[oldSize + delta];
240 for (int i = 0; i < newBitmap.length; ++i) {
241 newBitmap[i] = 0;
242 }
243 System.arraycopy(updated, 0, newBitmap, offset, oldSize);
244 updated = newBitmap;
245
246 newBitmap = new long[deleted.length + delta];
247 for (int i = 0; i < newBitmap.length; ++i) {
248 newBitmap[i] = partial ? 0 : WORD_MASK;
249 }
250 System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
251 deleted = newBitmap;
252 }
253
254 public void merge(final BitSetNode rightNode) {
255 int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
256
257 long[] newBitmap;
258 int oldSize = updated.length;
259 int newSize = (delta - rightNode.updated.length);
260 int offset = oldSize + newSize;
261
262 newBitmap = new long[oldSize + delta];
263 System.arraycopy(updated, 0, newBitmap, 0, oldSize);
264 System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length);
265 updated = newBitmap;
266
267 newBitmap = new long[oldSize + delta];
268 System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
269 System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
270 deleted = newBitmap;
271
272 for (int i = 0; i < newSize; ++i) {
273 updated[offset + i] = 0;
274 deleted[offset + i] = partial ? 0 : WORD_MASK;
275 }
276 }
277
278 @Override
279 public String toString() {
280 return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
281 }
282
283
284
285
286 public long getMinProcId() {
287 long minProcId = start;
288 for (int i = 0; i < deleted.length; ++i) {
289 if (deleted[i] == 0) {
290 return(minProcId);
291 }
292
293 if (deleted[i] != WORD_MASK) {
294 for (int j = 0; j < BITS_PER_WORD; ++j) {
295 if ((deleted[i] & (1L << j)) != 0) {
296 return minProcId + j;
297 }
298 }
299 }
300
301 minProcId += BITS_PER_WORD;
302 }
303 return minProcId;
304 }
305
306 public long getMaxProcId() {
307 long maxProcId = getEnd();
308 for (int i = deleted.length - 1; i >= 0; --i) {
309 if (deleted[i] == 0) {
310 return maxProcId;
311 }
312
313 if (deleted[i] != WORD_MASK) {
314 for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
315 if ((deleted[i] & (1L << j)) == 0) {
316 return maxProcId - (BITS_PER_WORD - 1 - j);
317 }
318 }
319 }
320 maxProcId -= BITS_PER_WORD;
321 }
322 return maxProcId;
323 }
324
325
326
327
328 private int getBitmapIndex(final long procId) {
329 return (int)(procId - start);
330 }
331
332 private void updateState(final long procId, final boolean isDeleted) {
333 int bitmapIndex = getBitmapIndex(procId);
334 int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
335 long value = (1L << bitmapIndex);
336
337 if (isDeleted) {
338 updated[wordIndex] |= value;
339 deleted[wordIndex] |= value;
340 } else {
341 updated[wordIndex] |= value;
342 deleted[wordIndex] &= ~value;
343 }
344 }
345
346
347
348
349 private static long alignUp(final long x) {
350 return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
351 }
352
353 private static long alignDown(final long x) {
354 return x & -BITS_PER_WORD;
355 }
356 }
357
358 public void insert(long procId) {
359 BitSetNode node = getOrCreateNode(procId);
360 node.update(procId);
361 trackProcIds(procId);
362 }
363
364 public void insert(final long procId, final long[] subProcIds) {
365 update(procId);
366 for (int i = 0; i < subProcIds.length; ++i) {
367 insert(subProcIds[i]);
368 }
369 }
370
371 public void update(long procId) {
372 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
373 assert entry != null : "expected node to update procId=" + procId;
374
375 BitSetNode node = entry.getValue();
376 assert node.contains(procId);
377 node.update(procId);
378 trackProcIds(procId);
379 }
380
381 public void delete(long procId) {
382 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
383 assert entry != null : "expected node to delete procId=" + procId;
384
385 BitSetNode node = entry.getValue();
386 assert node.contains(procId) : "expected procId in the node";
387 node.delete(procId);
388
389 if (!keepDeletes && node.isEmpty()) {
390
391 map.remove(entry.getKey());
392 }
393
394 trackProcIds(procId);
395 }
396
397 private void trackProcIds(long procId) {
398 minUpdatedProcId = Math.min(minUpdatedProcId, procId);
399 maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
400 }
401
402 public long getUpdatedMinProcId() {
403 return minUpdatedProcId;
404 }
405
406 public long getUpdatedMaxProcId() {
407 return maxUpdatedProcId;
408 }
409
410 @InterfaceAudience.Private
411 public void setDeleted(final long procId, final boolean isDeleted) {
412 BitSetNode node = getOrCreateNode(procId);
413 assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
414 node.updateState(procId, isDeleted);
415 }
416
417 public void reset() {
418 this.keepDeletes = false;
419 this.partial = false;
420 this.map.clear();
421 resetUpdates();
422 }
423
424 public DeleteState isDeleted(long procId) {
425 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
426 if (entry != null && entry.getValue().contains(procId)) {
427 BitSetNode node = entry.getValue();
428 DeleteState state = node.isDeleted(procId);
429 return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
430 }
431 return partial ? DeleteState.MAYBE : DeleteState.YES;
432 }
433
434 public long getMinProcId() {
435
436 Map.Entry<Long, BitSetNode> entry = map.firstEntry();
437 return entry == null ? 0 : entry.getValue().getMinProcId();
438 }
439
440 public void setKeepDeletes(boolean keepDeletes) {
441 this.keepDeletes = keepDeletes;
442 if (!keepDeletes) {
443 Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
444 while (it.hasNext()) {
445 Map.Entry<Long, BitSetNode> entry = it.next();
446 if (entry.getValue().isEmpty()) {
447 it.remove();
448 }
449 }
450 }
451 }
452
453 public void setPartialFlag(boolean isPartial) {
454 if (this.partial && !isPartial) {
455 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
456 entry.getValue().unsetPartialFlag();
457 }
458 }
459 this.partial = isPartial;
460 }
461
462 public boolean isEmpty() {
463 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
464 if (entry.getValue().isEmpty() == false) {
465 return false;
466 }
467 }
468 return true;
469 }
470
471 public boolean isUpdated() {
472 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
473 if (entry.getValue().isUpdated() == false) {
474 return false;
475 }
476 }
477 return true;
478 }
479
480 public boolean isTracking(long minId, long maxId) {
481
482 return map.floorEntry(minId) != null || map.floorEntry(maxId) != null;
483 }
484
485 public void resetUpdates() {
486 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
487 entry.getValue().resetUpdates();
488 }
489 minUpdatedProcId = Long.MAX_VALUE;
490 maxUpdatedProcId = Long.MIN_VALUE;
491 }
492
493 public void undeleteAll() {
494 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
495 entry.getValue().undeleteAll();
496 }
497 }
498
499 private BitSetNode getOrCreateNode(final long procId) {
500
501 BitSetNode leftNode = null;
502 boolean leftCanGrow = false;
503 Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
504 if (leftEntry != null) {
505 leftNode = leftEntry.getValue();
506 if (leftNode.contains(procId)) {
507 return leftNode;
508 }
509 leftCanGrow = leftNode.canGrow(procId);
510 }
511
512 BitSetNode rightNode = null;
513 boolean rightCanGrow = false;
514 Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
515 if (rightEntry != null) {
516 rightNode = rightEntry.getValue();
517 rightCanGrow = rightNode.canGrow(procId);
518 if (leftNode != null) {
519 if (leftNode.canMerge(rightNode)) {
520
521 return mergeNodes(leftNode, rightNode);
522 }
523
524 if (leftCanGrow && rightCanGrow) {
525 if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
526
527 return growNode(leftNode, procId);
528 }
529
530 return growNode(rightNode, procId);
531 }
532 }
533 }
534
535
536 if (leftCanGrow) {
537 return growNode(leftNode, procId);
538 }
539
540
541 if (rightCanGrow) {
542 return growNode(rightNode, procId);
543 }
544
545
546 BitSetNode node = new BitSetNode(procId, partial);
547 map.put(node.getStart(), node);
548 return node;
549 }
550
551 private BitSetNode growNode(BitSetNode node, long procId) {
552 map.remove(node.getStart());
553 node.grow(procId);
554 map.put(node.getStart(), node);
555 return node;
556 }
557
558 private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
559 assert leftNode.getStart() < rightNode.getStart();
560 leftNode.merge(rightNode);
561 map.remove(rightNode.getStart());
562 return leftNode;
563 }
564
565 public void dump() {
566 System.out.println("map " + map.size());
567 System.out.println("isUpdated " + isUpdated());
568 System.out.println("isEmpty " + isEmpty());
569 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
570 entry.getValue().dump();
571 }
572 }
573
574 public void writeTo(final OutputStream stream) throws IOException {
575 ProcedureProtos.ProcedureStoreTracker.Builder builder =
576 ProcedureProtos.ProcedureStoreTracker.newBuilder();
577 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
578 builder.addNode(entry.getValue().convert());
579 }
580 builder.build().writeDelimitedTo(stream);
581 }
582
583 public void readFrom(final InputStream stream) throws IOException {
584 reset();
585 final ProcedureProtos.ProcedureStoreTracker data =
586 ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
587 for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
588 final BitSetNode node = BitSetNode.convert(protoNode);
589 map.put(node.getStart(), node);
590 }
591 }
592 }