1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.Modifier;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.Map;
29
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ProcedureInfo;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
36 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
37 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
38 import org.apache.hadoop.hbase.util.ByteStringer;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40 import org.apache.hadoop.hbase.util.NonceKey;
41
42 import com.google.common.annotations.VisibleForTesting;
43 import com.google.common.base.Preconditions;
44 import com.google.protobuf.ByteString;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 @InterfaceStability.Evolving
67 public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
68
69 private String owner = null;
70 private Long parentProcId = null;
71 private Long procId = null;
72 private long startTime;
73
74
75 private ProcedureState state = ProcedureState.INITIALIZING;
76 private Integer timeout = null;
77 private int[] stackIndexes = null;
78 private int childrenLatch = 0;
79 private long lastUpdate;
80
81 private RemoteProcedureException exception = null;
82 private byte[] result = null;
83
84 private NonceKey nonceKey = null;
85
86
87
88
89
90
91
92
93
94
95 protected abstract Procedure[] execute(TEnvironment env)
96 throws ProcedureYieldException, InterruptedException;
97
98
99
100
101
102
103
104
105
106
107
108
109 protected abstract void rollback(TEnvironment env)
110 throws IOException, InterruptedException;
111
112
113
114
115
116
117
118
119
120
121
122
123
124 protected abstract boolean abort(TEnvironment env);
125
126
127
128
129
130
131 protected abstract void serializeStateData(final OutputStream stream)
132 throws IOException;
133
134
135
136
137
138
139 protected abstract void deserializeStateData(final InputStream stream)
140 throws IOException;
141
142
143
144
145
146
147
148
149
150
151 protected boolean acquireLock(final TEnvironment env) {
152 return true;
153 }
154
155
156
157
158 protected void releaseLock(final TEnvironment env) {
159
160 }
161
162
163
164
165
166
167
168 protected void beforeReplay(final TEnvironment env) {
169
170 }
171
172
173
174
175
176
177 protected void completionCleanup(final TEnvironment env) {
178
179 }
180
181
182
183
184
185
186
187
188
189 protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
190 return false;
191 }
192
193
194
195
196
197
198
199
200
201
202 protected boolean shouldWaitClientAck(final TEnvironment env) {
203 return true;
204 }
205
206 @Override
207 public String toString() {
208
209 return toStringSimpleSB().toString();
210 }
211
212
213
214
215
216
217 protected StringBuilder toStringSimpleSB() {
218 StringBuilder sb = new StringBuilder();
219 toStringClassDetails(sb);
220
221 if (procId != null) {
222 sb.append(" id=");
223 sb.append(getProcId());
224 }
225
226 if (hasParent()) {
227 sb.append(" parent=");
228 sb.append(getParentProcId());
229 }
230
231 if (hasOwner()) {
232 sb.append(" owner=");
233 sb.append(getOwner());
234 }
235
236 sb.append(" state=");
237 toStringState(sb);
238
239 return sb;
240 }
241
242
243
244
245
246 public String toStringDetails() {
247 StringBuilder sb = toStringSimpleSB();
248
249 sb.append(" startTime=");
250 sb.append(getStartTime());
251
252 sb.append(" lastUpdate=");
253 sb.append(getLastUpdate());
254
255 if (stackIndexes != null) {
256 sb.append("\n");
257 sb.append("stackIndexes=");
258 sb.append(Arrays.toString(getStackIndexes()));
259 }
260
261 return sb.toString();
262 }
263
264 protected String toStringClass() {
265 StringBuilder sb = new StringBuilder();
266 toStringClassDetails(sb);
267
268 return sb.toString();
269 }
270
271
272
273
274
275 protected void toStringState(StringBuilder builder) {
276 builder.append(getState());
277 }
278
279
280
281
282
283
284 protected void toStringClassDetails(StringBuilder builder) {
285 builder.append(getClass().getName());
286 }
287
288
289
290
291 public byte[] getResult() {
292 return result;
293 }
294
295
296
297
298
299 protected void setResult(final byte[] result) {
300 this.result = result;
301 }
302
303 public long getProcId() {
304 return procId;
305 }
306
307 public boolean hasParent() {
308 return parentProcId != null;
309 }
310
311 public boolean hasException() {
312 return exception != null;
313 }
314
315 public boolean hasTimeout() {
316 return timeout != null;
317 }
318
319 public long getParentProcId() {
320 return parentProcId;
321 }
322
323 public NonceKey getNonceKey() {
324 return nonceKey;
325 }
326
327
328
329
330
331 public synchronized boolean isFailed() {
332 return exception != null || state == ProcedureState.ROLLEDBACK;
333 }
334
335
336
337
338 public synchronized boolean isSuccess() {
339 return state == ProcedureState.FINISHED && exception == null;
340 }
341
342
343
344
345
346 public synchronized boolean isFinished() {
347 switch (state) {
348 case ROLLEDBACK:
349 return true;
350 case FINISHED:
351 return exception == null;
352 default:
353 break;
354 }
355 return false;
356 }
357
358
359
360
361 public synchronized boolean isWaiting() {
362 switch (state) {
363 case WAITING:
364 case WAITING_TIMEOUT:
365 return true;
366 default:
367 break;
368 }
369 return false;
370 }
371
372 public synchronized RemoteProcedureException getException() {
373 return exception;
374 }
375
376 public long getStartTime() {
377 return startTime;
378 }
379
380 public synchronized long getLastUpdate() {
381 return lastUpdate;
382 }
383
384 public synchronized long elapsedTime() {
385 return lastUpdate - startTime;
386 }
387
388
389
390
391 protected void setTimeout(final int timeout) {
392 this.timeout = timeout;
393 }
394
395
396
397
398 public int getTimeout() {
399 return timeout;
400 }
401
402
403
404
405 public long getTimeRemaining() {
406 return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
407 }
408
409 @VisibleForTesting
410 @InterfaceAudience.Private
411 public void setOwner(final String owner) {
412 this.owner = StringUtils.isEmpty(owner) ? null : owner;
413 }
414
415 public String getOwner() {
416 return owner;
417 }
418
419 public boolean hasOwner() {
420 return owner != null;
421 }
422
423 @VisibleForTesting
424 @InterfaceAudience.Private
425 protected synchronized void setState(final ProcedureState state) {
426 this.state = state;
427 updateTimestamp();
428 }
429
430 @InterfaceAudience.Private
431 protected synchronized ProcedureState getState() {
432 return state;
433 }
434
435 protected void setFailure(final String source, final Throwable cause) {
436 setFailure(new RemoteProcedureException(source, cause));
437 }
438
439 protected synchronized void setFailure(final RemoteProcedureException exception) {
440 this.exception = exception;
441 if (!isFinished()) {
442 setState(ProcedureState.FINISHED);
443 }
444 }
445
446 protected void setAbortFailure(final String source, final String msg) {
447 setFailure(source, new ProcedureAbortedException(msg));
448 }
449
450 @InterfaceAudience.Private
451 protected synchronized boolean setTimeoutFailure() {
452 if (state == ProcedureState.WAITING_TIMEOUT) {
453 long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
454 setFailure("ProcedureExecutor", new TimeoutIOException(
455 "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
456 return true;
457 }
458 return false;
459 }
460
461
462
463
464 @VisibleForTesting
465 @InterfaceAudience.Private
466 protected void setProcId(final long procId) {
467 this.procId = procId;
468 this.startTime = EnvironmentEdgeManager.currentTime();
469 setState(ProcedureState.RUNNABLE);
470 }
471
472
473
474
475 @InterfaceAudience.Private
476 protected void setParentProcId(final long parentProcId) {
477 this.parentProcId = parentProcId;
478 }
479
480
481
482
483 @VisibleForTesting
484 @InterfaceAudience.Private
485 protected void setNonceKey(final NonceKey nonceKey) {
486 this.nonceKey = nonceKey;
487 }
488
489
490
491
492
493 @InterfaceAudience.Private
494 protected Procedure[] doExecute(final TEnvironment env)
495 throws ProcedureYieldException, InterruptedException {
496 try {
497 updateTimestamp();
498 return execute(env);
499 } finally {
500 updateTimestamp();
501 }
502 }
503
504
505
506
507
508 @InterfaceAudience.Private
509 protected void doRollback(final TEnvironment env)
510 throws IOException, InterruptedException {
511 try {
512 updateTimestamp();
513 rollback(env);
514 } finally {
515 updateTimestamp();
516 }
517 }
518
519
520
521
522
523 @InterfaceAudience.Private
524 protected void setStartTime(final long startTime) {
525 this.startTime = startTime;
526 }
527
528
529
530
531
532 private synchronized void setLastUpdate(final long lastUpdate) {
533 this.lastUpdate = lastUpdate;
534 }
535
536 protected synchronized void updateTimestamp() {
537 this.lastUpdate = EnvironmentEdgeManager.currentTime();
538 }
539
540
541
542
543 @InterfaceAudience.Private
544 protected synchronized void setChildrenLatch(final int numChildren) {
545 this.childrenLatch = numChildren;
546 }
547
548
549
550
551 @InterfaceAudience.Private
552 protected synchronized void incChildrenLatch() {
553
554 this.childrenLatch++;
555 }
556
557
558
559
560
561 @InterfaceAudience.Private
562 protected synchronized boolean childrenCountDown() {
563 assert childrenLatch > 0;
564 return --childrenLatch == 0;
565 }
566
567
568
569
570
571 @InterfaceAudience.Private
572 protected synchronized void addStackIndex(final int index) {
573 if (stackIndexes == null) {
574 stackIndexes = new int[] { index };
575 } else {
576 int count = stackIndexes.length;
577 stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
578 stackIndexes[count] = index;
579 }
580 }
581
582 @InterfaceAudience.Private
583 protected synchronized boolean removeStackIndex() {
584 if (stackIndexes.length > 1) {
585 stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
586 return false;
587 } else {
588 stackIndexes = null;
589 return true;
590 }
591 }
592
593
594
595
596
597 @InterfaceAudience.Private
598 protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
599 this.stackIndexes = new int[stackIndexes.size()];
600 for (int i = 0; i < this.stackIndexes.length; ++i) {
601 this.stackIndexes[i] = stackIndexes.get(i);
602 }
603 }
604
605 @InterfaceAudience.Private
606 protected synchronized boolean wasExecuted() {
607 return stackIndexes != null;
608 }
609
610 @InterfaceAudience.Private
611 protected synchronized int[] getStackIndexes() {
612 return stackIndexes;
613 }
614
615 @Override
616 public int compareTo(final Procedure other) {
617 long diff = getProcId() - other.getProcId();
618 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
619 }
620
621
622
623
624
625 public static long getProcIdHashCode(final long procId) {
626 long h = procId;
627 h ^= h >> 16;
628 h *= 0x85ebca6b;
629 h ^= h >> 13;
630 h *= 0xc2b2ae35;
631 h ^= h >> 16;
632 return h;
633 }
634
635
636
637
638 @InterfaceAudience.Private
639 protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
640 while (proc.hasParent()) {
641 proc = procedures.get(proc.getParentProcId());
642 if (proc == null) return null;
643 }
644 return proc.getProcId();
645 }
646
647 protected static Procedure newInstance(final String className) throws IOException {
648 try {
649 Class<?> clazz = Class.forName(className);
650 if (!Modifier.isPublic(clazz.getModifiers())) {
651 throw new Exception("the " + clazz + " class is not public");
652 }
653
654 Constructor<?> ctor = clazz.getConstructor();
655 assert ctor != null : "no constructor found";
656 if (!Modifier.isPublic(ctor.getModifiers())) {
657 throw new Exception("the " + clazz + " constructor is not public");
658 }
659 return (Procedure)ctor.newInstance();
660 } catch (Exception e) {
661 throw new IOException("The procedure class " + className +
662 " must be accessible and have an empty constructor", e);
663 }
664 }
665
666 protected static void validateClass(final Procedure proc) throws IOException {
667 try {
668 Class<?> clazz = proc.getClass();
669 if (!Modifier.isPublic(clazz.getModifiers())) {
670 throw new Exception("the " + clazz + " class is not public");
671 }
672
673 Constructor<?> ctor = clazz.getConstructor();
674 assert ctor != null;
675 if (!Modifier.isPublic(ctor.getModifiers())) {
676 throw new Exception("the " + clazz + " constructor is not public");
677 }
678 } catch (Exception e) {
679 throw new IOException("The procedure class " + proc.getClass().getName() +
680 " must be accessible and have an empty constructor", e);
681 }
682 }
683
684
685
686
687 @InterfaceAudience.Private
688 public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
689 RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
690 return new ProcedureInfo(
691 proc.getProcId(),
692 proc.toStringClass(),
693 proc.getOwner(),
694 proc.getState(),
695 proc.hasParent() ? proc.getParentProcId() : -1,
696 nonceKey,
697 exception != null ?
698 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
699 proc.getLastUpdate(),
700 proc.getStartTime(),
701 proc.getResult());
702 }
703
704
705
706
707
708 @InterfaceAudience.Private
709 public static ProcedureProtos.Procedure convert(final Procedure proc)
710 throws IOException {
711 Preconditions.checkArgument(proc != null);
712 validateClass(proc);
713
714 ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
715 .setClassName(proc.getClass().getName())
716 .setProcId(proc.getProcId())
717 .setState(proc.getState())
718 .setStartTime(proc.getStartTime())
719 .setLastUpdate(proc.getLastUpdate());
720
721 if (proc.hasParent()) {
722 builder.setParentId(proc.getParentProcId());
723 }
724
725 if (proc.hasTimeout()) {
726 builder.setTimeout(proc.getTimeout());
727 }
728
729 if (proc.hasOwner()) {
730 builder.setOwner(proc.getOwner());
731 }
732
733 int[] stackIds = proc.getStackIndexes();
734 if (stackIds != null) {
735 for (int i = 0; i < stackIds.length; ++i) {
736 builder.addStackId(stackIds[i]);
737 }
738 }
739
740 if (proc.hasException()) {
741 RemoteProcedureException exception = proc.getException();
742 builder.setException(
743 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
744 }
745
746 byte[] result = proc.getResult();
747 if (result != null) {
748 builder.setResult(ByteStringer.wrap(result));
749 }
750
751 ByteString.Output stateStream = ByteString.newOutput();
752 proc.serializeStateData(stateStream);
753 if (stateStream.size() > 0) {
754 builder.setStateData(stateStream.toByteString());
755 }
756
757 if (proc.getNonceKey() != null) {
758 builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
759 builder.setNonce(proc.getNonceKey().getNonce());
760 }
761
762 return builder.build();
763 }
764
765
766
767
768
769
770
771
772
773
774 @InterfaceAudience.Private
775 public static Procedure convert(final ProcedureProtos.Procedure proto)
776 throws IOException {
777
778 Procedure proc = Procedure.newInstance(proto.getClassName());
779
780
781 proc.setProcId(proto.getProcId());
782 proc.setState(proto.getState());
783 proc.setStartTime(proto.getStartTime());
784 proc.setLastUpdate(proto.getLastUpdate());
785
786 if (proto.hasParentId()) {
787 proc.setParentProcId(proto.getParentId());
788 }
789
790 if (proto.hasOwner()) {
791 proc.setOwner(proto.getOwner());
792 }
793
794 if (proto.hasTimeout()) {
795 proc.setTimeout(proto.getTimeout());
796 }
797
798 if (proto.getStackIdCount() > 0) {
799 proc.setStackIndexes(proto.getStackIdList());
800 }
801
802 if (proto.hasException()) {
803 assert proc.getState() == ProcedureState.FINISHED ||
804 proc.getState() == ProcedureState.ROLLEDBACK :
805 "The procedure must be failed (waiting to rollback) or rolledback";
806 proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
807 }
808
809 if (proto.hasResult()) {
810 proc.setResult(proto.getResult().toByteArray());
811 }
812
813 if (proto.getNonce() != HConstants.NO_NONCE) {
814 NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
815 proc.setNonceKey(nonceKey);
816 }
817
818
819 proc.deserializeStateData(proto.getStateData().newInput());
820
821 return proc;
822 }
823 }