001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.yetus.audience.InterfaceStability;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
031
032/**
033 * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure".
034 * A "Root Procedure" is a Procedure without parent, each subprocedure will be
035 * added to the "Root Procedure" stack (or rollback-stack).
036 *
037 * RootProcedureState is used and managed only by the ProcedureExecutor.
038 *    Long rootProcId = getRootProcedureId(proc);
039 *    rollbackStack.get(rootProcId).acquire(proc)
040 *    rollbackStack.get(rootProcId).release(proc)
041 *    ...
042 */
043@InterfaceAudience.Private
044@InterfaceStability.Evolving
045class RootProcedureState<TEnvironment> {
046
047  private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class);
048
049  private enum State {
050    RUNNING,         // The Procedure is running or ready to run
051    FAILED,          // The Procedure failed, waiting for the rollback executing
052    ROLLINGBACK,     // The Procedure failed and the execution was rolledback
053  }
054
055  private Set<Procedure<TEnvironment>> subprocs = null;
056  private ArrayList<Procedure<TEnvironment>> subprocStack = null;
057  private State state = State.RUNNING;
058  private int running = 0;
059
060  public synchronized boolean isFailed() {
061    switch (state) {
062      case ROLLINGBACK:
063      case FAILED:
064        return true;
065      default:
066        break;
067    }
068    return false;
069  }
070
071  public synchronized boolean isRollingback() {
072    return state == State.ROLLINGBACK;
073  }
074
075  /**
076   * Called by the ProcedureExecutor to mark rollback execution
077   */
078  protected synchronized boolean setRollback() {
079    if (running == 0 && state == State.FAILED) {
080      state = State.ROLLINGBACK;
081      return true;
082    }
083    return false;
084  }
085
086  /**
087   * Called by the ProcedureExecutor to mark rollback execution
088   */
089  protected synchronized void unsetRollback() {
090    assert state == State.ROLLINGBACK;
091    state = State.FAILED;
092  }
093
094  protected synchronized long[] getSubprocedureIds() {
095    if (subprocs == null) {
096      return null;
097    }
098    return subprocs.stream().mapToLong(Procedure::getProcId).toArray();
099  }
100
101  protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() {
102    return subprocStack;
103  }
104
105  protected synchronized RemoteProcedureException getException() {
106    if (subprocStack != null) {
107      for (Procedure<TEnvironment> proc: subprocStack) {
108        if (proc.hasException()) {
109          return proc.getException();
110        }
111      }
112    }
113    return null;
114  }
115
116  /**
117   * Called by the ProcedureExecutor to mark the procedure step as running.
118   */
119  protected synchronized boolean acquire(Procedure<TEnvironment> proc) {
120    if (state != State.RUNNING) {
121      return false;
122    }
123
124    running++;
125    return true;
126  }
127
128  /**
129   * Called by the ProcedureExecutor to mark the procedure step as finished.
130   */
131  protected synchronized void release(Procedure<TEnvironment> proc) {
132    running--;
133  }
134
135  protected synchronized void abort() {
136    if (state == State.RUNNING) {
137      state = State.FAILED;
138    }
139  }
140
141  /**
142   * Called by the ProcedureExecutor after the procedure step is completed,
143   * to add the step to the rollback list (or procedure stack)
144   */
145  protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) {
146    if (proc.isFailed()) {
147      state = State.FAILED;
148    }
149    if (subprocStack == null) {
150      subprocStack = new ArrayList<>();
151    }
152    proc.addStackIndex(subprocStack.size());
153    LOG.debug("Add procedure {} as the {}th rollback step", proc, subprocStack.size());
154    subprocStack.add(proc);
155  }
156
157  protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) {
158    if (!proc.hasParent()) {
159      return;
160    }
161    if (subprocs == null) {
162      subprocs = new HashSet<>();
163    }
164    subprocs.add(proc);
165  }
166
167  /**
168   * Called on store load by the ProcedureExecutor to load part of the stack.
169   *
170   * Each procedure has its own stack-positions. Which means we have to write
171   * to the store only the Procedure we executed, and nothing else.
172   * on load we recreate the full stack by aggregating each procedure stack-positions.
173   */
174  protected synchronized void loadStack(Procedure<TEnvironment> proc) {
175    addSubProcedure(proc);
176    int[] stackIndexes = proc.getStackIndexes();
177    if (stackIndexes != null) {
178      if (subprocStack == null) {
179        subprocStack = new ArrayList<>();
180      }
181      int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size();
182      if (diff > 0) {
183        subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
184        while (diff-- > 0) subprocStack.add(null);
185      }
186      for (int i = 0; i < stackIndexes.length; ++i) {
187        subprocStack.set(stackIndexes[i], proc);
188      }
189    }
190    if (proc.getState() == ProcedureState.ROLLEDBACK) {
191      state = State.ROLLINGBACK;
192    } else if (proc.isFailed()) {
193      state = State.FAILED;
194    }
195  }
196
197  /**
198   * Called on store load by the ProcedureExecutor to validate the procedure stack.
199   */
200  protected synchronized boolean isValid() {
201    if (subprocStack != null) {
202      for (Procedure<TEnvironment> proc : subprocStack) {
203        if (proc == null) {
204          return false;
205        }
206      }
207    }
208    return true;
209  }
210}