View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.lang.reflect.Constructor;
25  import java.lang.reflect.Modifier;
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.Map;
29  
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.ProcedureInfo;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.classification.InterfaceStability;
34  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35  import org.apache.hadoop.hbase.procedure2.util.StringUtils;
36  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
38  import org.apache.hadoop.hbase.util.ByteStringer;
39  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
40  import org.apache.hadoop.hbase.util.NonceKey;
41  
42  import com.google.common.annotations.VisibleForTesting;
43  import com.google.common.base.Preconditions;
44  import com.google.protobuf.ByteString;
45  
46  /**
47   * Base Procedure class responsible to handle the Procedure Metadata
48   * e.g. state, startTime, lastUpdate, stack-indexes, ...
49   *
50   * execute() is called each time the procedure is executed.
51   * it may be called multiple times in case of failure and restart, so the
52   * code must be idempotent.
53   * the return is a set of sub-procedures or null in case the procedure doesn't
54   * have sub-procedures. Once the sub-procedures are successfully completed
55   * the execute() method is called again, you should think at it as a stack:
56   *  -> step 1
57   *  ---> step 2
58   *  -> step 1
59   *
60   * rollback() is called when the procedure or one of the sub-procedures is failed.
61   * the rollback step is supposed to cleanup the resources created during the
62   * execute() step. in case of failure and restart rollback() may be called
63   * multiple times, so the code must be idempotent.
64   */
65  @InterfaceAudience.Private
66  @InterfaceStability.Evolving
67  public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
68    // unchanged after initialization
69    private String owner = null;
70    private Long parentProcId = null;
71    private Long procId = null;
72    private long startTime;
73  
74    // runtime state, updated every operation
75    private ProcedureState state = ProcedureState.INITIALIZING;
76    private Integer timeout = null;
77    private int[] stackIndexes = null;
78    private int childrenLatch = 0;
79    private long lastUpdate;
80  
81    private RemoteProcedureException exception = null;
82    private byte[] result = null;
83  
84    private NonceKey nonceKey = null;
85  
86    /**
87     * The main code of the procedure. It must be idempotent since execute()
88     * may be called multiple time in case of machine failure in the middle
89     * of the execution.
90     * @param env the environment passed to the ProcedureExecutor
91     * @return a set of sub-procedures or null if there is nothing else to execute.
92     * @throws ProcedureYieldException the procedure will be added back to the queue and retried later
93     * @throws InterruptedException the procedure will be added back to the queue and retried later
94     */
95    protected abstract Procedure[] execute(TEnvironment env)
96      throws ProcedureYieldException, InterruptedException;
97  
98    /**
99     * The code to undo what done by the execute() code.
100    * It is called when the procedure or one of the sub-procedure failed or an
101    * abort was requested. It should cleanup all the resources created by
102    * the execute() call. The implementation must be idempotent since rollback()
103    * may be called multiple time in case of machine failure in the middle
104    * of the execution.
105    * @param env the environment passed to the ProcedureExecutor
106    * @throws IOException temporary failure, the rollback will retry later
107    * @throws InterruptedException the procedure will be added back to the queue and retried later
108    */
109   protected abstract void rollback(TEnvironment env)
110     throws IOException, InterruptedException;
111 
112   /**
113    * The abort() call is asynchronous and each procedure must decide how to deal
114    * with that, if they want to be abortable. The simplest implementation
115    * is to have an AtomicBoolean set in the abort() method and then the execute()
116    * will check if the abort flag is set or not.
117    * abort() may be called multiple times from the client, so the implementation
118    * must be idempotent.
119    *
120    * NOTE: abort() is not like Thread.interrupt() it is just a notification
121    * that allows the procedure implementor where to abort to avoid leak and
122    * have a better control on what was executed and what not.
123    */
124   protected abstract boolean abort(TEnvironment env);
125 
126   /**
127    * The user-level code of the procedure may have some state to
128    * persist (e.g. input arguments) to be able to resume on failure.
129    * @param stream the stream that will contain the user serialized data
130    */
131   protected abstract void serializeStateData(final OutputStream stream)
132     throws IOException;
133 
134   /**
135    * Called on store load to allow the user to decode the previously serialized
136    * state.
137    * @param stream the stream that contains the user serialized data
138    */
139   protected abstract void deserializeStateData(final InputStream stream)
140     throws IOException;
141 
142   /**
143    * The user should override this method, and try to take a lock if necessary.
144    * A lock can be anything, and it is up to the implementor.
145    * Example: in our Master we can execute request in parallel for different tables
146    *          create t1 and create t2 can be executed at the same time.
147    *          anything else on t1/t2 is queued waiting that specific table create to happen.
148    *
149    * @return true if the lock was acquired and false otherwise
150    */
151   protected boolean acquireLock(final TEnvironment env) {
152     return true;
153   }
154 
155   /**
156    * The user should override this method, and release lock if necessary.
157    */
158   protected void releaseLock(final TEnvironment env) {
159     // no-op
160   }
161 
162   /**
163    * Called when the procedure is loaded for replay.
164    * The procedure implementor may use this method to perform some quick
165    * operation before replay.
166    * e.g. failing the procedure if the state on replay may be unknown.
167    */
168   protected void beforeReplay(final TEnvironment env) {
169     // no-op
170   }
171 
172   /**
173    * Called when the procedure is marked as completed (success or rollback).
174    * The procedure implementor may use this method to cleanup in-memory states.
175    * This operation will not be retried on failure.
176    */
177   protected void completionCleanup(final TEnvironment env) {
178     // no-op
179   }
180 
181   /**
182    * By default, the executor will try ro run procedures start to finish.
183    * Return true to make the executor yield between each execution step to
184    * give other procedures time to run their steps.
185    * @param env the environment passed to the ProcedureExecutor
186    * @return Return true if the executor should yield on completion of an execution step.
187    *         Defaults to return false.
188    */
189   protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
190     return false;
191   }
192 
193   /**
194    * By default, the executor will keep the procedure result around util
195    * the eviction TTL is expired. The client can cut down the waiting time
196    * by requesting that the result is removed from the executor.
197    * In case of system started procedure, we can force the executor to auto-ack.
198    * @param env the environment passed to the ProcedureExecutor
199    * @return true if the executor should wait the client ack for the result.
200    *         Defaults to return true.
201    */
202   protected boolean shouldWaitClientAck(final TEnvironment env) {
203     return true;
204   }
205 
206   @Override
207   public String toString() {
208     // Return the simple String presentation of the procedure.
209     return toStringSimpleSB().toString();
210   }
211 
212   /**
213    * Build the StringBuilder for the simple form of
214    * procedure string.
215    * @return the StringBuilder
216    */
217   protected StringBuilder toStringSimpleSB() {
218     StringBuilder sb = new StringBuilder();
219     toStringClassDetails(sb);
220 
221     if (procId != null) {
222       sb.append(" id=");
223       sb.append(getProcId());
224     }
225 
226     if (hasParent()) {
227       sb.append(" parent=");
228       sb.append(getParentProcId());
229     }
230 
231     if (hasOwner()) {
232       sb.append(" owner=");
233       sb.append(getOwner());
234     }
235 
236     sb.append(" state=");
237     toStringState(sb);
238 
239     return sb;
240   }
241 
242   /**
243    * Extend the toString() information with more procedure
244    * details
245    */
246   public String toStringDetails() {
247     StringBuilder sb = toStringSimpleSB();
248 
249     sb.append(" startTime=");
250     sb.append(getStartTime());
251 
252     sb.append(" lastUpdate=");
253     sb.append(getLastUpdate());
254 
255     if (stackIndexes != null) {
256       sb.append("\n");
257       sb.append("stackIndexes=");
258       sb.append(Arrays.toString(getStackIndexes()));
259     }
260 
261     return sb.toString();
262   }
263 
264   protected String toStringClass() {
265     StringBuilder sb = new StringBuilder();
266     toStringClassDetails(sb);
267 
268     return sb.toString();
269   }
270 
271   /**
272    * Called from {@link #toString()} when interpolating {@link Procedure} state
273    * @param builder Append current {@link ProcedureState}
274    */
275   protected void toStringState(StringBuilder builder) {
276     builder.append(getState());
277   }
278 
279   /**
280    * Extend the toString() information with the procedure details
281    * e.g. className and parameters
282    * @param builder the string builder to use to append the proc specific information
283    */
284   protected void toStringClassDetails(StringBuilder builder) {
285     builder.append(getClass().getName());
286   }
287 
288   /**
289    * @return the serialized result if any, otherwise null
290    */
291   public byte[] getResult() {
292     return result;
293   }
294 
295   /**
296    * The procedure may leave a "result" on completion.
297    * @param result the serialized result that will be passed to the client
298    */
299   protected void setResult(final byte[] result) {
300     this.result = result;
301   }
302 
303   public long getProcId() {
304     return procId;
305   }
306 
307   public boolean hasParent() {
308     return parentProcId != null;
309   }
310 
311   public boolean hasException() {
312     return exception != null;
313   }
314 
315   public boolean hasTimeout() {
316     return timeout != null;
317   }
318 
319   public long getParentProcId() {
320     return parentProcId;
321   }
322 
323   public NonceKey getNonceKey() {
324     return nonceKey;
325   }
326 
327   /**
328    * @return true if the procedure has failed.
329    *         true may mean failed but not yet rolledback or failed and rolledback.
330    */
331   public synchronized boolean isFailed() {
332     return exception != null || state == ProcedureState.ROLLEDBACK;
333   }
334 
335   /**
336    * @return true if the procedure is finished successfully.
337    */
338   public synchronized boolean isSuccess() {
339     return state == ProcedureState.FINISHED && exception == null;
340   }
341 
342   /**
343    * @return true if the procedure is finished. The Procedure may be completed
344    *         successfuly or failed and rolledback.
345    */
346   public synchronized boolean isFinished() {
347     switch (state) {
348       case ROLLEDBACK:
349         return true;
350       case FINISHED:
351         return exception == null;
352       default:
353         break;
354     }
355     return false;
356   }
357 
358   /**
359    * @return true if the procedure is waiting for a child to finish or for an external event.
360    */
361   public synchronized boolean isWaiting() {
362     switch (state) {
363       case WAITING:
364       case WAITING_TIMEOUT:
365         return true;
366       default:
367         break;
368     }
369     return false;
370   }
371 
372   public synchronized RemoteProcedureException getException() {
373     return exception;
374   }
375 
376   public long getStartTime() {
377     return startTime;
378   }
379 
380   public synchronized long getLastUpdate() {
381     return lastUpdate;
382   }
383 
384   public synchronized long elapsedTime() {
385     return lastUpdate - startTime;
386   }
387 
388   /**
389    * @param timeout timeout in msec
390    */
391   protected void setTimeout(final int timeout) {
392     this.timeout = timeout;
393   }
394 
395   /**
396    * @return the timeout in msec
397    */
398   public int getTimeout() {
399     return timeout;
400   }
401 
402   /**
403    * @return the remaining time before the timeout
404    */
405   public long getTimeRemaining() {
406     return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime));
407   }
408 
409   @VisibleForTesting
410   @InterfaceAudience.Private
411   public void setOwner(final String owner) {
412     this.owner = StringUtils.isEmpty(owner) ? null : owner;
413   }
414 
415   public String getOwner() {
416     return owner;
417   }
418 
419   public boolean hasOwner() {
420     return owner != null;
421   }
422 
423   @VisibleForTesting
424   @InterfaceAudience.Private
425   protected synchronized void setState(final ProcedureState state) {
426     this.state = state;
427     updateTimestamp();
428   }
429 
430   @InterfaceAudience.Private
431   protected synchronized ProcedureState getState() {
432     return state;
433   }
434 
435   protected void setFailure(final String source, final Throwable cause) {
436     setFailure(new RemoteProcedureException(source, cause));
437   }
438 
439   protected synchronized void setFailure(final RemoteProcedureException exception) {
440     this.exception = exception;
441     if (!isFinished()) {
442       setState(ProcedureState.FINISHED);
443     }
444   }
445 
446   protected void setAbortFailure(final String source, final String msg) {
447     setFailure(source, new ProcedureAbortedException(msg));
448   }
449 
450   @InterfaceAudience.Private
451   protected synchronized boolean setTimeoutFailure() {
452     if (state == ProcedureState.WAITING_TIMEOUT) {
453       long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate;
454       setFailure("ProcedureExecutor", new TimeoutIOException(
455         "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff)));
456       return true;
457     }
458     return false;
459   }
460 
461   /**
462    * Called by the ProcedureExecutor to assign the ID to the newly created procedure.
463    */
464   @VisibleForTesting
465   @InterfaceAudience.Private
466   protected void setProcId(final long procId) {
467     this.procId = procId;
468     this.startTime = EnvironmentEdgeManager.currentTime();
469     setState(ProcedureState.RUNNABLE);
470   }
471 
472   /**
473    * Called by the ProcedureExecutor to assign the parent to the newly created procedure.
474    */
475   @InterfaceAudience.Private
476   protected void setParentProcId(final long parentProcId) {
477     this.parentProcId = parentProcId;
478   }
479 
480   /**
481    * Called by the ProcedureExecutor to set the value to the newly created procedure.
482    */
483   @VisibleForTesting
484   @InterfaceAudience.Private
485   protected void setNonceKey(final NonceKey nonceKey) {
486     this.nonceKey = nonceKey;
487   }
488 
489   /**
490    * Internal method called by the ProcedureExecutor that starts the
491    * user-level code execute().
492    */
493   @InterfaceAudience.Private
494   protected Procedure[] doExecute(final TEnvironment env)
495       throws ProcedureYieldException, InterruptedException {
496     try {
497       updateTimestamp();
498       return execute(env);
499     } finally {
500       updateTimestamp();
501     }
502   }
503 
504   /**
505    * Internal method called by the ProcedureExecutor that starts the
506    * user-level code rollback().
507    */
508   @InterfaceAudience.Private
509   protected void doRollback(final TEnvironment env)
510       throws IOException, InterruptedException {
511     try {
512       updateTimestamp();
513       rollback(env);
514     } finally {
515       updateTimestamp();
516     }
517   }
518 
519   /**
520    * Called on store load to initialize the Procedure internals after
521    * the creation/deserialization.
522    */
523   @InterfaceAudience.Private
524   protected void setStartTime(final long startTime) {
525     this.startTime = startTime;
526   }
527 
528   /**
529    * Called on store load to initialize the Procedure internals after
530    * the creation/deserialization.
531    */
532   private synchronized void setLastUpdate(final long lastUpdate) {
533     this.lastUpdate = lastUpdate;
534   }
535 
536   protected synchronized void updateTimestamp() {
537     this.lastUpdate = EnvironmentEdgeManager.currentTime();
538   }
539 
540   /**
541    * Called by the ProcedureExecutor on procedure-load to restore the latch state
542    */
543   @InterfaceAudience.Private
544   protected synchronized void setChildrenLatch(final int numChildren) {
545     this.childrenLatch = numChildren;
546   }
547 
548   /**
549    * Called by the ProcedureExecutor on procedure-load to restore the latch state
550    */
551   @InterfaceAudience.Private
552   protected synchronized void incChildrenLatch() {
553     // TODO: can this be inferred from the stack? I think so...
554     this.childrenLatch++;
555   }
556 
557   /**
558    * Called by the ProcedureExecutor to notify that one of the sub-procedures
559    * has completed.
560    */
561   @InterfaceAudience.Private
562   protected synchronized boolean childrenCountDown() {
563     assert childrenLatch > 0;
564     return --childrenLatch == 0;
565   }
566 
567   /**
568    * Called by the RootProcedureState on procedure execution.
569    * Each procedure store its stack-index positions.
570    */
571   @InterfaceAudience.Private
572   protected synchronized void addStackIndex(final int index) {
573     if (stackIndexes == null) {
574       stackIndexes = new int[] { index };
575     } else {
576       int count = stackIndexes.length;
577       stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
578       stackIndexes[count] = index;
579     }
580   }
581 
582   @InterfaceAudience.Private
583   protected synchronized boolean removeStackIndex() {
584     if (stackIndexes.length > 1) {
585       stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
586       return false;
587     } else {
588       stackIndexes = null;
589       return true;
590     }
591   }
592 
593   /**
594    * Called on store load to initialize the Procedure internals after
595    * the creation/deserialization.
596    */
597   @InterfaceAudience.Private
598   protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
599     this.stackIndexes = new int[stackIndexes.size()];
600     for (int i = 0; i < this.stackIndexes.length; ++i) {
601       this.stackIndexes[i] = stackIndexes.get(i);
602     }
603   }
604 
605   @InterfaceAudience.Private
606   protected synchronized boolean wasExecuted() {
607     return stackIndexes != null;
608   }
609 
610   @InterfaceAudience.Private
611   protected synchronized int[] getStackIndexes() {
612     return stackIndexes;
613   }
614 
615   @Override
616   public int compareTo(final Procedure other) {
617     long diff = getProcId() - other.getProcId();
618     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
619   }
620 
621   /**
622    * Get an hashcode for the specified Procedure ID
623    * @return the hashcode for the specified procId
624    */
625   public static long getProcIdHashCode(final long procId) {
626     long h = procId;
627     h ^= h >> 16;
628     h *= 0x85ebca6b;
629     h ^= h >> 13;
630     h *= 0xc2b2ae35;
631     h ^= h >> 16;
632     return h;
633   }
634 
635   /*
636    * Helper to lookup the root Procedure ID given a specified procedure.
637    */
638   @InterfaceAudience.Private
639   protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) {
640     while (proc.hasParent()) {
641       proc = procedures.get(proc.getParentProcId());
642       if (proc == null) return null;
643     }
644     return proc.getProcId();
645   }
646 
647   protected static Procedure newInstance(final String className) throws IOException {
648     try {
649       Class<?> clazz = Class.forName(className);
650       if (!Modifier.isPublic(clazz.getModifiers())) {
651         throw new Exception("the " + clazz + " class is not public");
652       }
653 
654       Constructor<?> ctor = clazz.getConstructor();
655       assert ctor != null : "no constructor found";
656       if (!Modifier.isPublic(ctor.getModifiers())) {
657         throw new Exception("the " + clazz + " constructor is not public");
658       }
659       return (Procedure)ctor.newInstance();
660     } catch (Exception e) {
661       throw new IOException("The procedure class " + className +
662           " must be accessible and have an empty constructor", e);
663     }
664   }
665 
666   protected static void validateClass(final Procedure proc) throws IOException {
667     try {
668       Class<?> clazz = proc.getClass();
669       if (!Modifier.isPublic(clazz.getModifiers())) {
670         throw new Exception("the " + clazz + " class is not public");
671       }
672 
673       Constructor<?> ctor = clazz.getConstructor();
674       assert ctor != null;
675       if (!Modifier.isPublic(ctor.getModifiers())) {
676         throw new Exception("the " + clazz + " constructor is not public");
677       }
678     } catch (Exception e) {
679       throw new IOException("The procedure class " + proc.getClass().getName() +
680           " must be accessible and have an empty constructor", e);
681     }
682   }
683 
684   /**
685    * Helper to create the ProcedureInfo from Procedure.
686    */
687   @InterfaceAudience.Private
688   public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
689     RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
690     return new ProcedureInfo(
691       proc.getProcId(),
692       proc.toStringClass(),
693       proc.getOwner(),
694       proc.getState(),
695       proc.hasParent() ? proc.getParentProcId() : -1,
696       nonceKey,
697       exception != null ?
698           RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
699       proc.getLastUpdate(),
700       proc.getStartTime(),
701       proc.getResult());
702   }
703 
704   /**
705    * Helper to convert the procedure to protobuf.
706    * Used by ProcedureStore implementations.
707    */
708   @InterfaceAudience.Private
709   public static ProcedureProtos.Procedure convert(final Procedure proc)
710       throws IOException {
711     Preconditions.checkArgument(proc != null);
712     validateClass(proc);
713 
714     ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
715       .setClassName(proc.getClass().getName())
716       .setProcId(proc.getProcId())
717       .setState(proc.getState())
718       .setStartTime(proc.getStartTime())
719       .setLastUpdate(proc.getLastUpdate());
720 
721     if (proc.hasParent()) {
722       builder.setParentId(proc.getParentProcId());
723     }
724 
725     if (proc.hasTimeout()) {
726       builder.setTimeout(proc.getTimeout());
727     }
728 
729     if (proc.hasOwner()) {
730       builder.setOwner(proc.getOwner());
731     }
732 
733     int[] stackIds = proc.getStackIndexes();
734     if (stackIds != null) {
735       for (int i = 0; i < stackIds.length; ++i) {
736         builder.addStackId(stackIds[i]);
737       }
738     }
739 
740     if (proc.hasException()) {
741       RemoteProcedureException exception = proc.getException();
742       builder.setException(
743         RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
744     }
745 
746     byte[] result = proc.getResult();
747     if (result != null) {
748       builder.setResult(ByteStringer.wrap(result));
749     }
750 
751     ByteString.Output stateStream = ByteString.newOutput();
752     proc.serializeStateData(stateStream);
753     if (stateStream.size() > 0) {
754       builder.setStateData(stateStream.toByteString());
755     }
756 
757     if (proc.getNonceKey() != null) {
758       builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
759       builder.setNonce(proc.getNonceKey().getNonce());
760     }
761 
762     return builder.build();
763   }
764 
765   /**
766    * Helper to convert the protobuf procedure.
767    * Used by ProcedureStore implementations.
768    *
769    * TODO: OPTIMIZATION: some of the field never change during the execution
770    *                     (e.g. className, procId, parentId, ...).
771    *                     We can split in 'data' and 'state', and the store
772    *                     may take advantage of it by storing the data only on insert().
773    */
774   @InterfaceAudience.Private
775   public static Procedure convert(final ProcedureProtos.Procedure proto)
776       throws IOException {
777     // Procedure from class name
778     Procedure proc = Procedure.newInstance(proto.getClassName());
779 
780     // set fields
781     proc.setProcId(proto.getProcId());
782     proc.setState(proto.getState());
783     proc.setStartTime(proto.getStartTime());
784     proc.setLastUpdate(proto.getLastUpdate());
785 
786     if (proto.hasParentId()) {
787       proc.setParentProcId(proto.getParentId());
788     }
789 
790     if (proto.hasOwner()) {
791       proc.setOwner(proto.getOwner());
792     }
793 
794     if (proto.hasTimeout()) {
795       proc.setTimeout(proto.getTimeout());
796     }
797 
798     if (proto.getStackIdCount() > 0) {
799       proc.setStackIndexes(proto.getStackIdList());
800     }
801 
802     if (proto.hasException()) {
803       assert proc.getState() == ProcedureState.FINISHED ||
804              proc.getState() == ProcedureState.ROLLEDBACK :
805              "The procedure must be failed (waiting to rollback) or rolledback";
806       proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
807     }
808 
809     if (proto.hasResult()) {
810       proc.setResult(proto.getResult().toByteArray());
811     }
812 
813     if (proto.getNonce() != HConstants.NO_NONCE) {
814       NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce());
815       proc.setNonceKey(nonceKey);
816     }
817 
818     // we want to call deserialize even when the stream is empty, mainly for testing.
819     proc.deserializeStateData(proto.getStateData().newInput());
820 
821     return proc;
822   }
823 }