View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.Modifier;
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.Map;
29  
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.ProcedureInfo;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
36  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
38  import org.apache.hadoop.hbase.util.ByteStringer;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  import org.apache.hadoop.hbase.util.NonceKey;
41  
42  import com.google.common.annotations.VisibleForTesting;
43  import com.google.common.base.Preconditions;
44  import com.google.protobuf.ByteString;
45  
46  /**
47   * Base Procedure class responsible to handle the Procedure Metadata
48   * e.g. state, startTime, lastUpdate, stack-indexes, ...
49   *
50   * execute() is called each time the procedure is executed.
51   * it may be called multiple times in case of failure and restart, so the
52   * code must be idempotent.
53   * the return is a set of sub-procedures or null in case the procedure doesn't
54   * have sub-procedures. Once the sub-procedures are successfully completed
55   * the execute() method is called again, you should think at it as a stack:
56   *  -> step 1
57   *  ---> step 2
58   *  -> step 1
59   *
60   * rollback() is called when the procedure or one of the sub-procedures is failed.
61   * the rollback step is supposed to cleanup the resources created during the
62   * execute() step. in case of failure and restart rollback() may be called
63   * multiple times, so the code must be idempotent.
64   */
65  @InterfaceAudience.Private
66  @InterfaceStability.Evolving
67  public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
68    // unchanged after initialization
69    private String owner = null;
70    private Long parentProcId = null;
71    private Long procId = null;
72    private long startTime;
73  
74    // runtime state, updated every operation
75    private ProcedureState state = ProcedureState.INITIALIZING;
76    private Integer timeout = null;
77    private int[] stackIndexes = null;
78    private int childrenLatch = 0;
79    private long lastUpdate;
80  
81    private RemoteProcedureException exception = null;
82    private byte[] result = null;
83  
84    private NonceKey nonceKey = null;
85  
86    /**
87     * The main code of the procedure. It must be idempotent since execute()
88     * may be called multiple time in case of machine failure in the middle
89     * of the execution.
90     * @return a set of sub-procedures or null if there is nothing else to execute.
91     */
92    protected abstract Procedure[] execute(TEnvironment env)
93      throws ProcedureYieldException;
94  
95    /**
96     * The code to undo what done by the execute() code.
97     * It is called when the procedure or one of the sub-procedure failed or an
98     * abort was requested. It should cleanup all the resources created by
99     * the execute() call. The implementation must be idempotent since rollback()
100    * may be called multiple time in case of machine failure in the middle
101    * of the execution.
102    * @throws IOException temporary failure, the rollback will retry later
103    */
104   protected abstract void rollback(TEnvironment env)
105     throws IOException;
106 
107   /**
108    * The abort() call is asynchronous and each procedure must decide how to deal
109    * with that, if they want to be abortable. The simplest implementation
110    * is to have an AtomicBoolean set in the abort() method and then the execute()
111    * will check if the abort flag is set or not.
112    * abort() may be called multiple times from the client, so the implementation
113    * must be idempotent.
114    *
115    * NOTE: abort() is not like Thread.interrupt() it is just a notification
116    * that allows the procedure implementor where to abort to avoid leak and
117    * have a better control on what was executed and what not.
118    */
119   protected abstract boolean abort(TEnvironment env);
120 
121   /**
122    * The user-level code of the procedure may have some state to
123    * persist (e.g. input arguments) to be able to resume on failure.
124    * @param stream the stream that will contain the user serialized data
125    */
126   protected abstract void serializeStateData(final OutputStream stream)
127     throws IOException;
128 
129   /**
130    * Called on store load to allow the user to decode the previously serialized
131    * state.
132    * @param stream the stream that contains the user serialized data
133    */
134   protected abstract void deserializeStateData(final InputStream stream)
135     throws IOException;
136 
137   /**
138    * The user should override this method, and try to take a lock if necessary.
139    * A lock can be anything, and it is up to the implementor.
140    * Example: in our Master we can execute request in parallel for different tables
141    *          create t1 and create t2 can be executed at the same time.
142    *          anything else on t1/t2 is queued waiting that specific table create to happen.
143    *
144    * @return true if the lock was acquired and false otherwise
145    */
146   protected boolean acquireLock(final TEnvironment env) {
147     return true;
148   }
149 
150   /**
151    * The user should override this method, and release lock if necessary.
152    */
153   protected void releaseLock(final TEnvironment env) {
154     // no-op
155   }
156 
157   /**
158    * Called when the procedure is loaded for replay.
159    * The procedure implementor may use this method to perform some quick
160    * operation before replay.
161    * e.g. failing the procedure if the state on replay may be unknown.
162    */
163   protected void beforeReplay(final TEnvironment env) {
164     // no-op
165   }
166 
167   /**
168    * Called when the procedure is marked as completed (success or rollback).
169    * The procedure implementor may use this method to cleanup in-memory states.
170    * This operation will not be retried on failure.
171    */
172   protected void completionCleanup(final TEnvironment env) {
173     // no-op
174   }
175 
176   @Override
177   public String toString() {
178     // Return the simple String presentation of the procedure.
179     return toStringSimpleSB().toString();
180   }
181 
182   /**
183    * Build the StringBuilder for the simple form of
184    * procedure string.
185    * @return the StringBuilder
186    */
187   protected StringBuilder toStringSimpleSB() {
188     StringBuilder sb = new StringBuilder();
189     toStringClassDetails(sb);
190 
191     if (procId != null) {
192       sb.append(" id=");
193       sb.append(getProcId());
194     }
195 
196     if (hasParent()) {
197       sb.append(" parent=");
198       sb.append(getParentProcId());
199     }
200 
201     if (hasOwner()) {
202       sb.append(" owner=");
203       sb.append(getOwner());
204     }
205 
206     sb.append(" state=");
207     sb.append(getState());
208 
209     return sb;
210   }
211 
212   /**
213    * Extend the toString() information with more procedure
214    * details
215    */
216   public String toStringDetails() {
217     StringBuilder sb = toStringSimpleSB();
218 
219     sb.append(" startTime=");
220     sb.append(getStartTime());
221 
222     sb.append(" lastUpdate=");
223     sb.append(getLastUpdate());
224 
225     if (stackIndexes != null) {
226       sb.append("\n");
227       sb.append("stackIndexes=");
228       sb.append(Arrays.toString(getStackIndexes()));
229     }
230 
231     return sb.toString();
232   }
233 
234   protected String toStringClass() {
235     StringBuilder sb = new StringBuilder();
236     toStringClassDetails(sb);
237 
238     return sb.toString();
239   }
240 
241   /**
242    * Extend the toString() information with the procedure details
243    * e.g. className and parameters
244    * @param builder the string builder to use to append the proc specific information
245    */
246   protected void toStringClassDetails(StringBuilder builder) {
247     builder.append(getClass().getName());
248   }
249 
250   /**
251    * @return the serialized result if any, otherwise null
252    */
253   public byte[] getResult() {
254     return result;
255   }
256 
257   /**
258    * The procedure may leave a "result" on completion.
259    * @param result the serialized result that will be passed to the client
260    */
261   protected void setResult(final byte[] result) {
262     this.result = result;
263   }
264 
265   public long getProcId() {
266     return procId;
267   }
268 
269   public boolean hasParent() {
270     return parentProcId != null;
271   }
272 
273   public boolean hasException() {
274     return exception != null;
275   }
276 
277   public boolean hasTimeout() {
278     return timeout != null;
279   }
280 
281   public long getParentProcId() {
282     return parentProcId;
283   }
284 
285   public NonceKey getNonceKey() {
286     return nonceKey;
287   }
288 
289   /**
290    * @return true if the procedure has failed.
291    *         true may mean failed but not yet rolledback or failed and rolledback.
292    */
293   public synchronized boolean isFailed() {
294     return exception != null || state == ProcedureState.ROLLEDBACK;
295   }
296 
297   /**
298    * @return true if the procedure is finished successfully.
299    */
300   public synchronized boolean isSuccess() {
301     return state == ProcedureState.FINISHED && exception == null;
302   }
303 
304   /**
305    * @return true if the procedure is finished. The Procedure may be completed
306    *         successfuly or failed and rolledback.
307    */
308   public synchronized boolean isFinished() {
309     switch (state) {
310       case ROLLEDBACK:
311         return true;
312       case FINISHED:
313         return exception == null;
314       default:
315         break;
316     }
317     return false;
318   }
319 
320   /**
321    * @return true if the procedure is waiting for a child to finish or for an external event.
322    */
323   public synchronized boolean isWaiting() {
324     switch (state) {
325       case WAITING:
326       case WAITING_TIMEOUT:
327         return true;
328       default:
329         break;
330     }
331     return false;
332   }
333 
334   public synchronized RemoteProcedureException getException() {
335     return exception;
336   }
337 
338   public long getStartTime() {
339     return startTime;
340   }
341 
342   public synchronized long getLastUpdate() {
343     return lastUpdate;
344   }
345 
346   public synchronized long elapsedTime() {
347     return lastUpdate - startTime;
348   }
349 
350   /**
351    * @param timeout timeout in msec
352    */
353   protected void setTimeout(final int timeout) {
354     this.timeout = timeout;
355   }
356 
357   /**
358    * @return the timeout in msec
359    */
360   public int getTimeout() {
361     return timeout;
362   }
363 
364   /**
365    * @return the remaining time before the timeout
366    */
367   public long getTimeRemaining() {
368     return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
369   }
370 
371   @VisibleForTesting
372   @InterfaceAudience.Private
373   public void setOwner(final String owner) {
374     this.owner = StringUtils.isEmpty(owner) ? null : owner;
375   }
376 
377   public String getOwner() {
378     return owner;
379   }
380 
381   public boolean hasOwner() {
382     return owner != null;
383   }
384 
385   @VisibleForTesting
386   @InterfaceAudience.Private
387   protected synchronized void setState(final ProcedureState state) {
388     this.state = state;
389     updateTimestamp();
390   }
391 
392   @InterfaceAudience.Private
393   protected synchronized ProcedureState getState() {
394     return state;
395   }
396 
397   protected void setFailure(final String source, final Throwable cause) {
398     setFailure(new RemoteProcedureException(source, cause));
399   }
400 
401   protected synchronized void setFailure(final RemoteProcedureException exception) {
402     this.exception = exception;
403     if (!isFinished()) {
404       setState(ProcedureState.FINISHED);
405     }
406   }
407 
408   protected void setAbortFailure(final String source, final String msg) {
409     setFailure(source, new ProcedureAbortedException(msg));
410   }
411 
412   @InterfaceAudience.Private
413   protected synchronized boolean setTimeoutFailure() {
414     if (state == ProcedureState.WAITING_TIMEOUT) {
415       long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
416       setFailure("ProcedureExecutor", new TimeoutIOException(
417         "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
418       return true;
419     }
420     return false;
421   }
422 
423   /**
424    * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
425    */
426   @VisibleForTesting
427   @InterfaceAudience.Private
428   protected void setProcId(final long procId) {
429     this.procId = procId;
430     this.startTime = EnvironmentEdgeManager.currentTime();
431     setState(ProcedureState.RUNNABLE);
432   }
433 
434   /**
435    * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
436    */
437   @InterfaceAudience.Private
438   protected void setParentProcId(final long parentProcId) {
439     this.parentProcId = parentProcId;
440   }
441 
442   /**
443    * Called by the ProcedureExecutor to set the value to the newly created procedure.
444    */
445   @VisibleForTesting
446   @InterfaceAudience.Private
447   protected void setNonceKey(final NonceKey nonceKey) {
448     this.nonceKey = nonceKey;
449   }
450 
451   /**
452    * Internal method called by the ProcedureExecutor that starts the
453    * user-level code execute().
454    */
455   @InterfaceAudience.Private
456   protected Procedure[] doExecute(final TEnvironment env)
457       throws ProcedureYieldException {
458     try {
459       updateTimestamp();
460       return execute(env);
461     } finally {
462       updateTimestamp();
463     }
464   }
465 
466   /**
467    * Internal method called by the ProcedureExecutor that starts the
468    * user-level code rollback().
469    */
470   @InterfaceAudience.Private
471   protected void doRollback(final TEnvironment env) throws IOException {
472     try {
473       updateTimestamp();
474       rollback(env);
475     } finally {
476       updateTimestamp();
477     }
478   }
479 
480   /**
481    * Called on store load to initialize the Procedure internals after
482    * the creation/deserialization.
483    */
484   @InterfaceAudience.Private
485   protected void setStartTime(final long startTime) {
486     this.startTime = startTime;
487   }
488 
489   /**
490    * Called on store load to initialize the Procedure internals after
491    * the creation/deserialization.
492    */
493   private synchronized void setLastUpdate(final long lastUpdate) {
494     this.lastUpdate = lastUpdate;
495   }
496 
497   protected synchronized void updateTimestamp() {
498     this.lastUpdate = EnvironmentEdgeManager.currentTime();
499   }
500 
501   /**
502    * Called by the ProcedureExecutor on procedure-load to restore the latch state
503    */
504   @InterfaceAudience.Private
505   protected synchronized void setChildrenLatch(final int numChildren) {
506     this.childrenLatch = numChildren;
507   }
508 
509   /**
510    * Called by the ProcedureExecutor on procedure-load to restore the latch state
511    */
512   @InterfaceAudience.Private
513   protected synchronized void incChildrenLatch() {
514     // TODO: can this be inferred from the stack? I think so...
515     this.childrenLatch++;
516   }
517 
518   /**
519    * Called by the ProcedureExecutor to notify that one of the sub-procedures
520    * has completed.
521    */
522   @InterfaceAudience.Private
523   protected synchronized boolean childrenCountDown() {
524     assert childrenLatch > 0;
525     return --childrenLatch == 0;
526   }
527 
528   /**
529    * Called by the RootProcedureState on procedure execution.
530    * Each procedure store its stack-index positions.
531    */
532   @InterfaceAudience.Private
533   protected synchronized void addStackIndex(final int index) {
534     if (stackIndexes == null) {
535       stackIndexes = new int[] { index };
536     } else {
537       int count = stackIndexes.length;
538       stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
539       stackIndexes[count] = index;
540     }
541   }
542 
543   @InterfaceAudience.Private
544   protected synchronized boolean removeStackIndex() {
545     if (stackIndexes.length > 1) {
546       stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
547       return false;
548     } else {
549       stackIndexes = null;
550       return true;
551     }
552   }
553 
554   /**
555    * Called on store load to initialize the Procedure internals after
556    * the creation/deserialization.
557    */
558   @InterfaceAudience.Private
559   protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
560     this.stackIndexes = new int[stackIndexes.size()];
561     for (int i = 0; i < this.stackIndexes.length; ++i) {
562       this.stackIndexes[i] = stackIndexes.get(i);
563     }
564   }
565 
566   @InterfaceAudience.Private
567   protected synchronized boolean wasExecuted() {
568     return stackIndexes != null;
569   }
570 
571   @InterfaceAudience.Private
572   protected synchronized int[] getStackIndexes() {
573     return stackIndexes;
574   }
575 
576   @Override
577   public int compareTo(final Procedure other) {
578     long diff = getProcId() - other.getProcId();
579     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
580   }
581 
582   /*
583    * Helper to lookup the root Procedure ID given a specified procedure.
584    */
585   @InterfaceAudience.Private
586   protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
587     while (proc.hasParent()) {
588       proc = procedures.get(proc.getParentProcId());
589       if (proc == null) return null;
590     }
591     return proc.getProcId();
592   }
593 
594   protected static Procedure newInstance(final String className) throws IOException {
595     try {
596       Class<?> clazz = Class.forName(className);
597       if (!Modifier.isPublic(clazz.getModifiers())) {
598         throw new Exception("the " + clazz + " class is not public");
599       }
600 
601       Constructor<?> ctor = clazz.getConstructor();
602       assert ctor != null : "no constructor found";
603       if (!Modifier.isPublic(ctor.getModifiers())) {
604         throw new Exception("the " + clazz + " constructor is not public");
605       }
606       return (Procedure)ctor.newInstance();
607     } catch (Exception e) {
608       throw new IOException("The procedure class " + className +
609           " must be accessible and have an empty constructor", e);
610     }
611   }
612 
613   protected static void validateClass(final Procedure proc) throws IOException {
614     try {
615       Class<?> clazz = proc.getClass();
616       if (!Modifier.isPublic(clazz.getModifiers())) {
617         throw new Exception("the " + clazz + " class is not public");
618       }
619 
620       Constructor<?> ctor = clazz.getConstructor();
621       assert ctor != null;
622       if (!Modifier.isPublic(ctor.getModifiers())) {
623         throw new Exception("the " + clazz + " constructor is not public");
624       }
625     } catch (Exception e) {
626       throw new IOException("The procedure class " + proc.getClass().getName() +
627           " must be accessible and have an empty constructor", e);
628     }
629   }
630 
631   /**
632    * Helper to create the ProcedureInfo from Procedure.
633    */
634   @InterfaceAudience.Private
635   public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
636     RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
637     return new ProcedureInfo(
638       proc.getProcId(),
639       proc.toStringClass(),
640       proc.getOwner(),
641       proc.getState(),
642       proc.hasParent() ? proc.getParentProcId() : -1,
643       nonceKey,
644       exception != null ?
645           RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
646       proc.getLastUpdate(),
647       proc.getStartTime(),
648       proc.getResult());
649   }
650 
651   /**
652    * Helper to convert the procedure to protobuf.
653    * Used by ProcedureStore implementations.
654    */
655   @InterfaceAudience.Private
656   public static ProcedureProtos.Procedure convert(final Procedure proc)
657       throws IOException {
658     Preconditions.checkArgument(proc != null);
659     validateClass(proc);
660 
661     ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
662       .setClassName(proc.getClass().getName())
663       .setProcId(proc.getProcId())
664       .setState(proc.getState())
665       .setStartTime(proc.getStartTime())
666       .setLastUpdate(proc.getLastUpdate());
667 
668     if (proc.hasParent()) {
669       builder.setParentId(proc.getParentProcId());
670     }
671 
672     if (proc.hasTimeout()) {
673       builder.setTimeout(proc.getTimeout());
674     }
675 
676     if (proc.hasOwner()) {
677       builder.setOwner(proc.getOwner());
678     }
679 
680     int[] stackIds = proc.getStackIndexes();
681     if (stackIds != null) {
682       for (int i = 0; i < stackIds.length; ++i) {
683         builder.addStackId(stackIds[i]);
684       }
685     }
686 
687     if (proc.hasException()) {
688       RemoteProcedureException exception = proc.getException();
689       builder.setException(
690         RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
691     }
692 
693     byte[] result = proc.getResult();
694     if (result != null) {
695       builder.setResult(ByteStringer.wrap(result));
696     }
697 
698     ByteString.Output stateStream = ByteString.newOutput();
699     proc.serializeStateData(stateStream);
700     if (stateStream.size() > 0) {
701       builder.setStateData(stateStream.toByteString());
702     }
703 
704     if (proc.getNonceKey() != null) {
705       builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
706       builder.setNonce(proc.getNonceKey().getNonce());
707     }
708 
709     return builder.build();
710   }
711 
712   /**
713    * Helper to convert the protobuf procedure.
714    * Used by ProcedureStore implementations.
715    *
716    * TODO: OPTIMIZATION: some of the field never change during the execution
717    *                     (e.g. className, procId, parentId, ...).
718    *                     We can split in 'data' and 'state', and the store
719    *                     may take advantage of it by storing the data only on insert().
720    */
721   @InterfaceAudience.Private
722   public static Procedure convert(final ProcedureProtos.Procedure proto)
723       throws IOException {
724     // Procedure from class name
725     Procedure proc = Procedure.newInstance(proto.getClassName());
726 
727     // set fields
728     proc.setProcId(proto.getProcId());
729     proc.setState(proto.getState());
730     proc.setStartTime(proto.getStartTime());
731     proc.setLastUpdate(proto.getLastUpdate());
732 
733     if (proto.hasParentId()) {
734       proc.setParentProcId(proto.getParentId());
735     }
736 
737     if (proto.hasOwner()) {
738       proc.setOwner(proto.getOwner());
739     }
740 
741     if (proto.hasTimeout()) {
742       proc.setTimeout(proto.getTimeout());
743     }
744 
745     if (proto.getStackIdCount() > 0) {
746       proc.setStackIndexes(proto.getStackIdList());
747     }
748 
749     if (proto.hasException()) {
750       assert proc.getState() == ProcedureState.FINISHED ||
751              proc.getState() == ProcedureState.ROLLEDBACK :
752              "The procedure must be failed (waiting to rollback) or rolledback";
753       proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
754     }
755 
756     if (proto.hasResult()) {
757       proc.setResult(proto.getResult().toByteArray());
758     }
759 
760     if (proto.getNonce() != HConstants.NO_NONCE) {
761       NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
762       proc.setNonceKey(nonceKey);
763     }
764 
765     // we want to call deserialize even when the stream is empty, mainly for testing.
766     proc.deserializeStateData(proto.getStateData().newInput());
767 
768     return proc;
769   }
770 }