001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.util.ArrayList;
021import java.util.HashSet;
022import java.util.List;
023import java.util.Set;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.yetus.audience.InterfaceStability;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
030
031/**
032 * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure".
033 * A "Root Procedure" is a Procedure without parent, each subprocedure will be
034 * added to the "Root Procedure" stack (or rollback-stack).
035 *
036 * RootProcedureState is used and managed only by the ProcedureExecutor.
037 *    Long rootProcId = getRootProcedureId(proc);
038 *    rollbackStack.get(rootProcId).acquire(proc)
039 *    rollbackStack.get(rootProcId).release(proc)
040 *    ...
041 */
042@InterfaceAudience.Private
043@InterfaceStability.Evolving
044class RootProcedureState<TEnvironment> {
045  private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class);
046
047  private enum State {
048    RUNNING,         // The Procedure is running or ready to run
049    FAILED,          // The Procedure failed, waiting for the rollback executing
050    ROLLINGBACK,     // The Procedure failed and the execution was rolledback
051  }
052
053  private Set<Procedure<TEnvironment>> subprocs = null;
054  private ArrayList<Procedure<TEnvironment>> subprocStack = null;
055  private State state = State.RUNNING;
056  private int running = 0;
057
058  public synchronized boolean isFailed() {
059    switch (state) {
060      case ROLLINGBACK:
061      case FAILED:
062        return true;
063      default:
064        break;
065    }
066    return false;
067  }
068
069  public synchronized boolean isRollingback() {
070    return state == State.ROLLINGBACK;
071  }
072
073  /**
074   * Called by the ProcedureExecutor to mark rollback execution
075   */
076  protected synchronized boolean setRollback() {
077    if (running == 0 && state == State.FAILED) {
078      state = State.ROLLINGBACK;
079      return true;
080    }
081    return false;
082  }
083
084  /**
085   * Called by the ProcedureExecutor to mark rollback execution
086   */
087  protected synchronized void unsetRollback() {
088    assert state == State.ROLLINGBACK;
089    state = State.FAILED;
090  }
091
092  protected synchronized long[] getSubprocedureIds() {
093    if (subprocs == null) {
094      return null;
095    }
096    return subprocs.stream().mapToLong(Procedure::getProcId).toArray();
097  }
098
099  protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() {
100    return subprocStack;
101  }
102
103  protected synchronized RemoteProcedureException getException() {
104    if (subprocStack != null) {
105      for (Procedure<TEnvironment> proc: subprocStack) {
106        if (proc.hasException()) {
107          return proc.getException();
108        }
109      }
110    }
111    return null;
112  }
113
114  /**
115   * Called by the ProcedureExecutor to mark the procedure step as running.
116   */
117  protected synchronized boolean acquire(Procedure<TEnvironment> proc) {
118    if (state != State.RUNNING) {
119      return false;
120    }
121
122    running++;
123    return true;
124  }
125
126  /**
127   * Called by the ProcedureExecutor to mark the procedure step as finished.
128   */
129  protected synchronized void release(Procedure<TEnvironment> proc) {
130    running--;
131  }
132
133  protected synchronized void abort() {
134    if (state == State.RUNNING) {
135      state = State.FAILED;
136    }
137  }
138
139  /**
140   * Called by the ProcedureExecutor after the procedure step is completed,
141   * to add the step to the rollback list (or procedure stack)
142   */
143  protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) {
144    if (proc.isFailed()) {
145      state = State.FAILED;
146    }
147    if (subprocStack == null) {
148      subprocStack = new ArrayList<>();
149    }
150    proc.addStackIndex(subprocStack.size());
151    LOG.debug("Add procedure {} as the {}th rollback step", proc, subprocStack.size());
152    subprocStack.add(proc);
153  }
154
155  protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) {
156    if (!proc.hasParent()) {
157      return;
158    }
159    if (subprocs == null) {
160      subprocs = new HashSet<>();
161    }
162    subprocs.add(proc);
163  }
164
165  /**
166   * Called on store load by the ProcedureExecutor to load part of the stack.
167   *
168   * Each procedure has its own stack-positions. Which means we have to write
169   * to the store only the Procedure we executed, and nothing else.
170   * on load we recreate the full stack by aggregating each procedure stack-positions.
171   */
172  protected synchronized void loadStack(Procedure<TEnvironment> proc) {
173    addSubProcedure(proc);
174    int[] stackIndexes = proc.getStackIndexes();
175    if (stackIndexes != null) {
176      if (subprocStack == null) {
177        subprocStack = new ArrayList<>();
178      }
179      int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size();
180      if (diff > 0) {
181        subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
182        while (diff-- > 0) {
183          subprocStack.add(null);
184        }
185      }
186      for (int i = 0; i < stackIndexes.length; ++i) {
187        subprocStack.set(stackIndexes[i], proc);
188      }
189    }
190    if (proc.getState() == ProcedureState.ROLLEDBACK) {
191      state = State.ROLLINGBACK;
192    } else if (proc.isFailed()) {
193      state = State.FAILED;
194    }
195  }
196
197  /**
198   * Called on store load by the ProcedureExecutor to validate the procedure stack.
199   */
200  protected synchronized boolean isValid() {
201    if (subprocStack != null) {
202      for (Procedure<TEnvironment> proc : subprocStack) {
203        if (proc == null) {
204          return false;
205        }
206      }
207    }
208    return true;
209  }
210}