1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.Modifier;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.Map;
29
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ProcedureInfo;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.classification.InterfaceStability;
34 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
36 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
37 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
38 import org.apache.hadoop.hbase.util.ByteStringer;
39 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40 import org.apache.hadoop.hbase.util.NonceKey;
41
42 import com.google.common.annotations.VisibleForTesting;
43 import com.google.common.base.Preconditions;
44 import com.google.protobuf.ByteString;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 @InterfaceStability.Evolving
67 public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
68
69 private String owner = null;
70 private Long parentProcId = null;
71 private Long procId = null;
72 private long startTime;
73
74
75 private ProcedureState state = ProcedureState.INITIALIZING;
76 private Integer timeout = null;
77 private int[] stackIndexes = null;
78 private int childrenLatch = 0;
79 private long lastUpdate;
80
81 private RemoteProcedureException exception = null;
82 private byte[] result = null;
83
84 private NonceKey nonceKey = null;
85
86
87
88
89
90
91
92 protected abstract Procedure[] execute(TEnvironment env)
93 throws ProcedureYieldException;
94
95
96
97
98
99
100
101
102
103
104 protected abstract void rollback(TEnvironment env)
105 throws IOException;
106
107
108
109
110
111
112
113
114
115
116
117
118
119 protected abstract boolean abort(TEnvironment env);
120
121
122
123
124
125
126 protected abstract void serializeStateData(final OutputStream stream)
127 throws IOException;
128
129
130
131
132
133
134 protected abstract void deserializeStateData(final InputStream stream)
135 throws IOException;
136
137
138
139
140
141
142
143
144
145
146 protected boolean acquireLock(final TEnvironment env) {
147 return true;
148 }
149
150
151
152
153 protected void releaseLock(final TEnvironment env) {
154
155 }
156
157
158
159
160
161
162
163 protected void beforeReplay(final TEnvironment env) {
164
165 }
166
167
168
169
170
171
172 protected void completionCleanup(final TEnvironment env) {
173
174 }
175
176 @Override
177 public String toString() {
178
179 return toStringSimpleSB().toString();
180 }
181
182
183
184
185
186
187 protected StringBuilder toStringSimpleSB() {
188 StringBuilder sb = new StringBuilder();
189 toStringClassDetails(sb);
190
191 if (procId != null) {
192 sb.append(" id=");
193 sb.append(getProcId());
194 }
195
196 if (hasParent()) {
197 sb.append(" parent=");
198 sb.append(getParentProcId());
199 }
200
201 if (hasOwner()) {
202 sb.append(" owner=");
203 sb.append(getOwner());
204 }
205
206 sb.append(" state=");
207 sb.append(getState());
208
209 return sb;
210 }
211
212
213
214
215
216 public String toStringDetails() {
217 StringBuilder sb = toStringSimpleSB();
218
219 sb.append(" startTime=");
220 sb.append(getStartTime());
221
222 sb.append(" lastUpdate=");
223 sb.append(getLastUpdate());
224
225 if (stackIndexes != null) {
226 sb.append("\n");
227 sb.append("stackIndexes=");
228 sb.append(Arrays.toString(getStackIndexes()));
229 }
230
231 return sb.toString();
232 }
233
234 protected String toStringClass() {
235 StringBuilder sb = new StringBuilder();
236 toStringClassDetails(sb);
237
238 return sb.toString();
239 }
240
241
242
243
244
245
246 protected void toStringClassDetails(StringBuilder builder) {
247 builder.append(getClass().getName());
248 }
249
250
251
252
253 public byte[] getResult() {
254 return result;
255 }
256
257
258
259
260
261 protected void setResult(final byte[] result) {
262 this.result = result;
263 }
264
265 public long getProcId() {
266 return procId;
267 }
268
269 public boolean hasParent() {
270 return parentProcId != null;
271 }
272
273 public boolean hasException() {
274 return exception != null;
275 }
276
277 public boolean hasTimeout() {
278 return timeout != null;
279 }
280
281 public long getParentProcId() {
282 return parentProcId;
283 }
284
285 public NonceKey getNonceKey() {
286 return nonceKey;
287 }
288
289
290
291
292
293 public synchronized boolean isFailed() {
294 return exception != null || state == ProcedureState.ROLLEDBACK;
295 }
296
297
298
299
300 public synchronized boolean isSuccess() {
301 return state == ProcedureState.FINISHED && exception == null;
302 }
303
304
305
306
307
308 public synchronized boolean isFinished() {
309 switch (state) {
310 case ROLLEDBACK:
311 return true;
312 case FINISHED:
313 return exception == null;
314 default:
315 break;
316 }
317 return false;
318 }
319
320
321
322
323 public synchronized boolean isWaiting() {
324 switch (state) {
325 case WAITING:
326 case WAITING_TIMEOUT:
327 return true;
328 default:
329 break;
330 }
331 return false;
332 }
333
334 public synchronized RemoteProcedureException getException() {
335 return exception;
336 }
337
338 public long getStartTime() {
339 return startTime;
340 }
341
342 public synchronized long getLastUpdate() {
343 return lastUpdate;
344 }
345
346 public synchronized long elapsedTime() {
347 return lastUpdate - startTime;
348 }
349
350
351
352
353 protected void setTimeout(final int timeout) {
354 this.timeout = timeout;
355 }
356
357
358
359
360 public int getTimeout() {
361 return timeout;
362 }
363
364
365
366
367 public long getTimeRemaining() {
368 return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
369 }
370
371 @VisibleForTesting
372 @InterfaceAudience.Private
373 public void setOwner(final String owner) {
374 this.owner = StringUtils.isEmpty(owner) ? null : owner;
375 }
376
377 public String getOwner() {
378 return owner;
379 }
380
381 public boolean hasOwner() {
382 return owner != null;
383 }
384
385 @VisibleForTesting
386 @InterfaceAudience.Private
387 protected synchronized void setState(final ProcedureState state) {
388 this.state = state;
389 updateTimestamp();
390 }
391
392 @InterfaceAudience.Private
393 protected synchronized ProcedureState getState() {
394 return state;
395 }
396
397 protected void setFailure(final String source, final Throwable cause) {
398 setFailure(new RemoteProcedureException(source, cause));
399 }
400
401 protected synchronized void setFailure(final RemoteProcedureException exception) {
402 this.exception = exception;
403 if (!isFinished()) {
404 setState(ProcedureState.FINISHED);
405 }
406 }
407
408 protected void setAbortFailure(final String source, final String msg) {
409 setFailure(source, new ProcedureAbortedException(msg));
410 }
411
412 @InterfaceAudience.Private
413 protected synchronized boolean setTimeoutFailure() {
414 if (state == ProcedureState.WAITING_TIMEOUT) {
415 long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
416 setFailure("ProcedureExecutor", new TimeoutIOException(
417 "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
418 return true;
419 }
420 return false;
421 }
422
423
424
425
426 @VisibleForTesting
427 @InterfaceAudience.Private
428 protected void setProcId(final long procId) {
429 this.procId = procId;
430 this.startTime = EnvironmentEdgeManager.currentTime();
431 setState(ProcedureState.RUNNABLE);
432 }
433
434
435
436
437 @InterfaceAudience.Private
438 protected void setParentProcId(final long parentProcId) {
439 this.parentProcId = parentProcId;
440 }
441
442
443
444
445 @VisibleForTesting
446 @InterfaceAudience.Private
447 protected void setNonceKey(final NonceKey nonceKey) {
448 this.nonceKey = nonceKey;
449 }
450
451
452
453
454
455 @InterfaceAudience.Private
456 protected Procedure[] doExecute(final TEnvironment env)
457 throws ProcedureYieldException {
458 try {
459 updateTimestamp();
460 return execute(env);
461 } finally {
462 updateTimestamp();
463 }
464 }
465
466
467
468
469
470 @InterfaceAudience.Private
471 protected void doRollback(final TEnvironment env) throws IOException {
472 try {
473 updateTimestamp();
474 rollback(env);
475 } finally {
476 updateTimestamp();
477 }
478 }
479
480
481
482
483
484 @InterfaceAudience.Private
485 protected void setStartTime(final long startTime) {
486 this.startTime = startTime;
487 }
488
489
490
491
492
493 private synchronized void setLastUpdate(final long lastUpdate) {
494 this.lastUpdate = lastUpdate;
495 }
496
497 protected synchronized void updateTimestamp() {
498 this.lastUpdate = EnvironmentEdgeManager.currentTime();
499 }
500
501
502
503
504 @InterfaceAudience.Private
505 protected synchronized void setChildrenLatch(final int numChildren) {
506 this.childrenLatch = numChildren;
507 }
508
509
510
511
512 @InterfaceAudience.Private
513 protected synchronized void incChildrenLatch() {
514
515 this.childrenLatch++;
516 }
517
518
519
520
521
522 @InterfaceAudience.Private
523 protected synchronized boolean childrenCountDown() {
524 assert childrenLatch > 0;
525 return --childrenLatch == 0;
526 }
527
528
529
530
531
532 @InterfaceAudience.Private
533 protected synchronized void addStackIndex(final int index) {
534 if (stackIndexes == null) {
535 stackIndexes = new int[] { index };
536 } else {
537 int count = stackIndexes.length;
538 stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
539 stackIndexes[count] = index;
540 }
541 }
542
543 @InterfaceAudience.Private
544 protected synchronized boolean removeStackIndex() {
545 if (stackIndexes.length > 1) {
546 stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
547 return false;
548 } else {
549 stackIndexes = null;
550 return true;
551 }
552 }
553
554
555
556
557
558 @InterfaceAudience.Private
559 protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
560 this.stackIndexes = new int[stackIndexes.size()];
561 for (int i = 0; i < this.stackIndexes.length; ++i) {
562 this.stackIndexes[i] = stackIndexes.get(i);
563 }
564 }
565
566 @InterfaceAudience.Private
567 protected synchronized boolean wasExecuted() {
568 return stackIndexes != null;
569 }
570
571 @InterfaceAudience.Private
572 protected synchronized int[] getStackIndexes() {
573 return stackIndexes;
574 }
575
576 @Override
577 public int compareTo(final Procedure other) {
578 long diff = getProcId() - other.getProcId();
579 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
580 }
581
582
583
584
585 @InterfaceAudience.Private
586 protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
587 while (proc.hasParent()) {
588 proc = procedures.get(proc.getParentProcId());
589 if (proc == null) return null;
590 }
591 return proc.getProcId();
592 }
593
594 protected static Procedure newInstance(final String className) throws IOException {
595 try {
596 Class<?> clazz = Class.forName(className);
597 if (!Modifier.isPublic(clazz.getModifiers())) {
598 throw new Exception("the " + clazz + " class is not public");
599 }
600
601 Constructor<?> ctor = clazz.getConstructor();
602 assert ctor != null : "no constructor found";
603 if (!Modifier.isPublic(ctor.getModifiers())) {
604 throw new Exception("the " + clazz + " constructor is not public");
605 }
606 return (Procedure)ctor.newInstance();
607 } catch (Exception e) {
608 throw new IOException("The procedure class " + className +
609 " must be accessible and have an empty constructor", e);
610 }
611 }
612
613 protected static void validateClass(final Procedure proc) throws IOException {
614 try {
615 Class<?> clazz = proc.getClass();
616 if (!Modifier.isPublic(clazz.getModifiers())) {
617 throw new Exception("the " + clazz + " class is not public");
618 }
619
620 Constructor<?> ctor = clazz.getConstructor();
621 assert ctor != null;
622 if (!Modifier.isPublic(ctor.getModifiers())) {
623 throw new Exception("the " + clazz + " constructor is not public");
624 }
625 } catch (Exception e) {
626 throw new IOException("The procedure class " + proc.getClass().getName() +
627 " must be accessible and have an empty constructor", e);
628 }
629 }
630
631
632
633
634 @InterfaceAudience.Private
635 public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
636 RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
637 return new ProcedureInfo(
638 proc.getProcId(),
639 proc.toStringClass(),
640 proc.getOwner(),
641 proc.getState(),
642 proc.hasParent() ? proc.getParentProcId() : -1,
643 nonceKey,
644 exception != null ?
645 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
646 proc.getLastUpdate(),
647 proc.getStartTime(),
648 proc.getResult());
649 }
650
651
652
653
654
655 @InterfaceAudience.Private
656 public static ProcedureProtos.Procedure convert(final Procedure proc)
657 throws IOException {
658 Preconditions.checkArgument(proc != null);
659 validateClass(proc);
660
661 ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
662 .setClassName(proc.getClass().getName())
663 .setProcId(proc.getProcId())
664 .setState(proc.getState())
665 .setStartTime(proc.getStartTime())
666 .setLastUpdate(proc.getLastUpdate());
667
668 if (proc.hasParent()) {
669 builder.setParentId(proc.getParentProcId());
670 }
671
672 if (proc.hasTimeout()) {
673 builder.setTimeout(proc.getTimeout());
674 }
675
676 if (proc.hasOwner()) {
677 builder.setOwner(proc.getOwner());
678 }
679
680 int[] stackIds = proc.getStackIndexes();
681 if (stackIds != null) {
682 for (int i = 0; i < stackIds.length; ++i) {
683 builder.addStackId(stackIds[i]);
684 }
685 }
686
687 if (proc.hasException()) {
688 RemoteProcedureException exception = proc.getException();
689 builder.setException(
690 RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
691 }
692
693 byte[] result = proc.getResult();
694 if (result != null) {
695 builder.setResult(ByteStringer.wrap(result));
696 }
697
698 ByteString.Output stateStream = ByteString.newOutput();
699 proc.serializeStateData(stateStream);
700 if (stateStream.size() > 0) {
701 builder.setStateData(stateStream.toByteString());
702 }
703
704 if (proc.getNonceKey() != null) {
705 builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
706 builder.setNonce(proc.getNonceKey().getNonce());
707 }
708
709 return builder.build();
710 }
711
712
713
714
715
716
717
718
719
720
721 @InterfaceAudience.Private
722 public static Procedure convert(final ProcedureProtos.Procedure proto)
723 throws IOException {
724
725 Procedure proc = Procedure.newInstance(proto.getClassName());
726
727
728 proc.setProcId(proto.getProcId());
729 proc.setState(proto.getState());
730 proc.setStartTime(proto.getStartTime());
731 proc.setLastUpdate(proto.getLastUpdate());
732
733 if (proto.hasParentId()) {
734 proc.setParentProcId(proto.getParentId());
735 }
736
737 if (proto.hasOwner()) {
738 proc.setOwner(proto.getOwner());
739 }
740
741 if (proto.hasTimeout()) {
742 proc.setTimeout(proto.getTimeout());
743 }
744
745 if (proto.getStackIdCount() > 0) {
746 proc.setStackIndexes(proto.getStackIdList());
747 }
748
749 if (proto.hasException()) {
750 assert proc.getState() == ProcedureState.FINISHED ||
751 proc.getState() == ProcedureState.ROLLEDBACK :
752 "The procedure must be failed (waiting to rollback) or rolledback";
753 proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
754 }
755
756 if (proto.hasResult()) {
757 proc.setResult(proto.getResult().toByteArray());
758 }
759
760 if (proto.getNonce() != HConstants.NO_NONCE) {
761 NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
762 proc.setNonceKey(nonceKey);
763 }
764
765
766 proc.deserializeStateData(proto.getStateData().newInput());
767
768 return proc;
769 }
770 }