1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.fs.FSDataInputStream;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.hbase.ProcedureInfo;
29 import org.apache.hadoop.hbase.procedure2.Procedure;
30 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
31 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
32 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
33 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
34
35
36
37
38 @InterfaceAudience.Private
39 @InterfaceStability.Evolving
40 public class ProcedureWALFormatReader {
41 private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024);
99 private final WalProcedureMap procedureMap = new WalProcedureMap(1024);
100
101
102 private long maxProcId = 0;
103
104 private final ProcedureStoreTracker tracker;
105 private final boolean hasFastStartSupport;
106
107 public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
108 this.tracker = tracker;
109
110 this.hasFastStartSupport = !tracker.isEmpty();
111 }
112
113 public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
114 FSDataInputStream stream = log.getStream();
115 try {
116 boolean hasMore = true;
117 while (hasMore) {
118 ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
119 if (entry == null) {
120 LOG.warn("nothing left to decode. exiting with missing EOF");
121 hasMore = false;
122 break;
123 }
124 switch (entry.getType()) {
125 case INIT:
126 readInitEntry(entry);
127 break;
128 case INSERT:
129 readInsertEntry(entry);
130 break;
131 case UPDATE:
132 case COMPACT:
133 readUpdateEntry(entry);
134 break;
135 case DELETE:
136 readDeleteEntry(entry);
137 break;
138 case EOF:
139 hasMore = false;
140 break;
141 default:
142 throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
143 }
144 }
145 } catch (IOException e) {
146 LOG.error("got an exception while reading the procedure WAL: " + log, e);
147 loader.markCorruptedWAL(log, e);
148 }
149
150 if (!localProcedureMap.isEmpty()) {
151 log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId());
152 procedureMap.mergeTail(localProcedureMap);
153
154
155
156
157
158
159
160
161 }
162 }
163
164 public void finalize(ProcedureWALFormat.Loader loader) throws IOException {
165
166 loader.setMaxProcId(maxProcId);
167
168
169 ProcedureIterator procIter = procedureMap.fetchReady();
170 if (procIter != null) loader.load(procIter);
171
172
173
174 procIter = procedureMap.fetchAll();
175 if (procIter != null) loader.handleCorrupted(procIter);
176 }
177
178 private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) {
179 maxProcId = Math.max(maxProcId, proc.getProcId());
180 if (isRequired(proc.getProcId())) {
181 if (LOG.isTraceEnabled()) {
182 LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
183 }
184 localProcedureMap.add(proc);
185 tracker.setDeleted(proc.getProcId(), false);
186 }
187 }
188
189 private void readInitEntry(final ProcedureWALEntry entry)
190 throws IOException {
191 assert entry.getProcedureCount() == 1 : "Expected only one procedure";
192 loadProcedure(entry, entry.getProcedure(0));
193 }
194
195 private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
196 assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
197 loadProcedure(entry, entry.getProcedure(0));
198 for (int i = 1; i < entry.getProcedureCount(); ++i) {
199 loadProcedure(entry, entry.getProcedure(i));
200 }
201 }
202
203 private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
204 assert entry.getProcedureCount() == 1 : "Expected only one procedure";
205 loadProcedure(entry, entry.getProcedure(0));
206 }
207
208 private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
209 assert entry.getProcedureCount() == 0 : "Expected no procedures";
210 assert entry.hasProcId() : "expected ProcID";
211 if (LOG.isTraceEnabled()) {
212 LOG.trace("read delete entry " + entry.getProcId());
213 }
214 maxProcId = Math.max(maxProcId, entry.getProcId());
215 localProcedureMap.remove(entry.getProcId());
216 assert !procedureMap.contains(entry.getProcId());
217 tracker.setDeleted(entry.getProcId(), true);
218 }
219
220 private boolean isDeleted(final long procId) {
221 return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
222 }
223
224 private boolean isRequired(final long procId) {
225 return !isDeleted(procId) && !procedureMap.contains(procId);
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244 private static class Entry {
245
246 protected Entry hashNext;
247
248 protected Entry childHead;
249
250 protected Entry linkNext;
251 protected Entry linkPrev;
252
253 protected Entry replayNext;
254 protected Entry replayPrev;
255
256 protected Procedure procedure;
257 protected ProcedureProtos.Procedure proto;
258 protected boolean ready = false;
259
260 public Entry(Entry hashNext) { this.hashNext = hashNext; }
261
262 public long getProcId() { return proto.getProcId(); }
263 public long getParentId() { return proto.getParentId(); }
264 public boolean hasParent() { return proto.hasParentId(); }
265 public boolean isReady() { return ready; }
266
267 public boolean isCompleted() {
268 if (!hasParent()) {
269 switch (proto.getState()) {
270 case ROLLEDBACK:
271 return true;
272 case FINISHED:
273 return !proto.hasException();
274 default:
275 break;
276 }
277 }
278 return false;
279 }
280
281 public Procedure convert() throws IOException {
282 if (procedure == null) {
283 procedure = Procedure.convert(proto);
284 }
285 return procedure;
286 }
287
288 public ProcedureInfo convertToInfo() {
289 return ProcedureInfo.convert(proto);
290 }
291
292 @Override
293 public String toString() {
294 return "Entry(" + getProcId() + ", parentId=" + getParentId() + ")";
295 }
296 }
297
298 private static class EntryIterator implements ProcedureIterator {
299 private final Entry replayHead;
300 private Entry current;
301
302 public EntryIterator(Entry replayHead) {
303 this.replayHead = replayHead;
304 this.current = replayHead;
305 }
306
307 @Override
308 public void reset() {
309 this.current = replayHead;
310 }
311
312 @Override
313 public boolean hasNext() {
314 return current != null;
315 }
316
317 @Override
318 public boolean isNextCompleted() {
319 return current != null && current.isCompleted();
320 }
321
322 @Override
323 public void skipNext() {
324 current = current.replayNext;
325 }
326
327 @Override
328 public Procedure nextAsProcedure() throws IOException {
329 try {
330 return current.convert();
331 } finally {
332 current = current.replayNext;
333 }
334 }
335
336 @Override
337 public ProcedureInfo nextAsProcedureInfo() {
338 try {
339 return current.convertToInfo();
340 } finally {
341 current = current.replayNext;
342 }
343 }
344 }
345
346 private static class WalProcedureMap {
347
348 private Entry[] procedureMap;
349
350
351 private Entry replayOrderHead;
352 private Entry replayOrderTail;
353
354
355 private Entry rootHead;
356
357
358 private Entry childUnlinkedHead;
359
360
361 private long minProcId = Long.MAX_VALUE;
362 private long maxProcId = Long.MIN_VALUE;
363
364 public WalProcedureMap(int size) {
365 procedureMap = new Entry[size];
366 replayOrderHead = null;
367 replayOrderTail = null;
368 rootHead = null;
369 childUnlinkedHead = null;
370 }
371
372 public void add(ProcedureProtos.Procedure procProto) {
373 trackProcIds(procProto.getProcId());
374 Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId());
375 boolean isNew = entry.proto == null;
376 entry.proto = procProto;
377 addToReplayList(entry);
378
379 if (isNew) {
380 if (procProto.hasParentId()) {
381 childUnlinkedHead = addToLinkList(entry, childUnlinkedHead);
382 } else {
383 rootHead = addToLinkList(entry, rootHead);
384 }
385 }
386 }
387
388 public boolean remove(long procId) {
389 trackProcIds(procId);
390 Entry entry = removeFromMap(procId);
391 if (entry != null) {
392 unlinkFromReplayList(entry);
393 unlinkFromLinkList(entry);
394 return true;
395 }
396 return false;
397 }
398
399 private void trackProcIds(long procId) {
400 minProcId = Math.min(minProcId, procId);
401 maxProcId = Math.max(maxProcId, procId);
402 }
403
404 public long getMinProcId() {
405 return minProcId;
406 }
407
408 public long getMaxProcId() {
409 return maxProcId;
410 }
411
412 public boolean contains(long procId) {
413 return getProcedure(procId) != null;
414 }
415
416 public boolean isEmpty() {
417 return replayOrderHead == null;
418 }
419
420 public void clear() {
421 for (int i = 0; i < procedureMap.length; ++i) {
422 procedureMap[i] = null;
423 }
424 replayOrderHead = null;
425 replayOrderTail = null;
426 rootHead = null;
427 childUnlinkedHead = null;
428 minProcId = Long.MAX_VALUE;
429 maxProcId = Long.MIN_VALUE;
430 }
431
432
433
434
435
436
437
438
439
440
441
442 public void mergeTail(WalProcedureMap other) {
443 for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) {
444 int slotIndex = getMapSlot(p.getProcId());
445 p.hashNext = procedureMap[slotIndex];
446 procedureMap[slotIndex] = p;
447 }
448
449 if (replayOrderHead == null) {
450 replayOrderHead = other.replayOrderHead;
451 replayOrderTail = other.replayOrderTail;
452 rootHead = other.rootHead;
453 childUnlinkedHead = other.childUnlinkedHead;
454 } else {
455
456 assert replayOrderTail.replayNext == null;
457 assert other.replayOrderHead.replayPrev == null;
458 replayOrderTail.replayNext = other.replayOrderHead;
459 other.replayOrderHead.replayPrev = replayOrderTail;
460 replayOrderTail = other.replayOrderTail;
461
462
463 if (rootHead == null) {
464 rootHead = other.rootHead;
465 } else if (other.rootHead != null) {
466 Entry otherTail = findLinkListTail(other.rootHead);
467 otherTail.linkNext = rootHead;
468 rootHead.linkPrev = otherTail;
469 rootHead = other.rootHead;
470 }
471
472
473 if (childUnlinkedHead == null) {
474 childUnlinkedHead = other.childUnlinkedHead;
475 } else if (other.childUnlinkedHead != null) {
476 Entry otherTail = findLinkListTail(other.childUnlinkedHead);
477 otherTail.linkNext = childUnlinkedHead;
478 childUnlinkedHead.linkPrev = otherTail;
479 childUnlinkedHead = other.childUnlinkedHead;
480 }
481 }
482
483 other.clear();
484 }
485
486
487
488
489
490
491 public EntryIterator fetchReady() {
492 buildGraph();
493
494 Entry readyHead = null;
495 Entry readyTail = null;
496 Entry p = replayOrderHead;
497 while (p != null) {
498 Entry next = p.replayNext;
499 if (p.isReady()) {
500 unlinkFromReplayList(p);
501 if (readyTail != null) {
502 readyTail.replayNext = p;
503 p.replayPrev = readyTail;
504 } else {
505 p.replayPrev = null;
506 readyHead = p;
507 }
508 readyTail = p;
509 p.replayNext = null;
510 }
511 p = next;
512 }
513
514
515 for (p = readyHead; p != null; p = p.replayNext) {
516 removeFromMap(p.getProcId());
517 unlinkFromLinkList(p);
518 }
519 return readyHead != null ? new EntryIterator(readyHead) : null;
520 }
521
522
523
524
525 public EntryIterator fetchAll() {
526 Entry head = replayOrderHead;
527 for (Entry p = head; p != null; p = p.replayNext) {
528 removeFromMap(p.getProcId());
529 }
530 for (int i = 0; i < procedureMap.length; ++i) {
531 assert procedureMap[i] == null : "map not empty i=" + i;
532 }
533 replayOrderHead = null;
534 replayOrderTail = null;
535 childUnlinkedHead = null;
536 rootHead = null;
537 return head != null ? new EntryIterator(head) : null;
538 }
539
540 private void buildGraph() {
541 Entry p = childUnlinkedHead;
542 while (p != null) {
543 Entry next = p.linkNext;
544 Entry rootProc = getRootProcedure(p);
545 if (rootProc != null) {
546 rootProc.childHead = addToLinkList(p, rootProc.childHead);
547 }
548 p = next;
549 }
550
551 for (p = rootHead; p != null; p = p.linkNext) {
552 checkReadyToRun(p);
553 }
554 }
555
556 private Entry getRootProcedure(Entry entry) {
557 while (entry != null && entry.hasParent()) {
558 entry = getProcedure(entry.getParentId());
559 }
560 return entry;
561 }
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602 private boolean checkReadyToRun(Entry rootEntry) {
603 int stackIdSum = 0;
604 int maxStackId = 0;
605 for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) {
606 int stackId = 1 + rootEntry.proto.getStackId(i);
607 maxStackId = Math.max(maxStackId, stackId);
608 stackIdSum += stackId;
609 }
610
611 for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
612 for (int i = 0; i < p.proto.getStackIdCount(); ++i) {
613 int stackId = 1 + p.proto.getStackId(i);
614 maxStackId = Math.max(maxStackId, stackId);
615 stackIdSum += stackId;
616 }
617 }
618 final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2);
619 if (cmpStackIdSum == stackIdSum) {
620 rootEntry.ready = true;
621 for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) {
622 p.ready = true;
623 }
624 return true;
625 }
626 return false;
627 }
628
629 private void unlinkFromReplayList(Entry entry) {
630 if (replayOrderHead == entry) {
631 replayOrderHead = entry.replayNext;
632 }
633 if (replayOrderTail == entry) {
634 replayOrderTail = entry.replayPrev;
635 }
636 if (entry.replayPrev != null) {
637 entry.replayPrev.replayNext = entry.replayNext;
638 }
639 if (entry.replayNext != null) {
640 entry.replayNext.replayPrev = entry.replayPrev;
641 }
642 }
643
644 private void addToReplayList(final Entry entry) {
645 unlinkFromReplayList(entry);
646 entry.replayNext = replayOrderHead;
647 entry.replayPrev = null;
648 if (replayOrderHead != null) {
649 replayOrderHead.replayPrev = entry;
650 } else {
651 replayOrderTail = entry;
652 }
653 replayOrderHead = entry;
654 }
655
656 private void unlinkFromLinkList(Entry entry) {
657 if (entry == rootHead) {
658 rootHead = entry.linkNext;
659 } else if (entry == childUnlinkedHead) {
660 childUnlinkedHead = entry.linkNext;
661 }
662 if (entry.linkPrev != null) {
663 entry.linkPrev.linkNext = entry.linkNext;
664 }
665 if (entry.linkNext != null) {
666 entry.linkNext.linkPrev = entry.linkPrev;
667 }
668 }
669
670 private Entry addToLinkList(Entry entry, Entry linkHead) {
671 unlinkFromLinkList(entry);
672 entry.linkNext = linkHead;
673 entry.linkPrev = null;
674 if (linkHead != null) {
675 linkHead.linkPrev = entry;
676 }
677 return entry;
678 }
679
680 private Entry findLinkListTail(Entry linkHead) {
681 Entry tail = linkHead;
682 while (tail.linkNext != null) {
683 tail = tail.linkNext;
684 }
685 return tail;
686 }
687
688 private Entry addToMap(final long procId, final boolean hasParent) {
689 int slotIndex = getMapSlot(procId);
690 Entry entry = getProcedure(slotIndex, procId);
691 if (entry != null) return entry;
692
693 entry = new Entry(procedureMap[slotIndex]);
694 procedureMap[slotIndex] = entry;
695 return entry;
696 }
697
698 private Entry removeFromMap(final long procId) {
699 int slotIndex = getMapSlot(procId);
700 Entry prev = null;
701 Entry entry = procedureMap[slotIndex];
702 while (entry != null) {
703 if (procId == entry.getProcId()) {
704 if (prev != null) {
705 prev.hashNext = entry.hashNext;
706 } else {
707 procedureMap[slotIndex] = entry.hashNext;
708 }
709 entry.hashNext = null;
710 return entry;
711 }
712 prev = entry;
713 entry = entry.hashNext;
714 }
715 return null;
716 }
717
718 private Entry getProcedure(final long procId) {
719 return getProcedure(getMapSlot(procId), procId);
720 }
721
722 private Entry getProcedure(final int slotIndex, final long procId) {
723 Entry entry = procedureMap[slotIndex];
724 while (entry != null) {
725 if (procId == entry.getProcId()) {
726 return entry;
727 }
728 entry = entry.hashNext;
729 }
730 return null;
731 }
732
733 private int getMapSlot(final long procId) {
734 return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length);
735 }
736 }
737 }