001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.util.ArrayList;
021import java.util.HashSet;
022import java.util.List;
023import java.util.Set;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.yetus.audience.InterfaceStability;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
030
031/**
032 * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure". A "Root
033 * Procedure" is a Procedure without parent, each subprocedure will be added to the "Root Procedure"
034 * stack (or rollback-stack). RootProcedureState is used and managed only by the ProcedureExecutor.
035 * Long rootProcId = getRootProcedureId(proc); rollbackStack.get(rootProcId).acquire(proc)
036 * rollbackStack.get(rootProcId).release(proc) ...
037 */
038@InterfaceAudience.Private
039@InterfaceStability.Evolving
040class RootProcedureState<TEnvironment> {
041  private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class);
042
043  private enum State {
044    RUNNING, // The Procedure is running or ready to run
045    FAILED, // The Procedure failed, waiting for the rollback executing
046    ROLLINGBACK, // The Procedure failed and the execution was rolledback
047  }
048
049  private Set<Procedure<TEnvironment>> subprocs = null;
050  private ArrayList<Procedure<TEnvironment>> subprocStack = null;
051  private State state = State.RUNNING;
052  private int running = 0;
053
054  public synchronized boolean isFailed() {
055    switch (state) {
056      case ROLLINGBACK:
057      case FAILED:
058        return true;
059      default:
060        break;
061    }
062    return false;
063  }
064
065  public synchronized boolean isRollingback() {
066    return state == State.ROLLINGBACK;
067  }
068
069  /**
070   * Called by the ProcedureExecutor to mark rollback execution
071   */
072  protected synchronized boolean setRollback() {
073    if (running == 0 && state == State.FAILED) {
074      state = State.ROLLINGBACK;
075      return true;
076    }
077    return false;
078  }
079
080  /**
081   * Called by the ProcedureExecutor to mark rollback execution
082   */
083  protected synchronized void unsetRollback() {
084    assert state == State.ROLLINGBACK;
085    state = State.FAILED;
086  }
087
088  protected synchronized long[] getSubprocedureIds() {
089    if (subprocs == null) {
090      return null;
091    }
092    return subprocs.stream().mapToLong(Procedure::getProcId).toArray();
093  }
094
095  protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() {
096    return subprocStack;
097  }
098
099  protected synchronized RemoteProcedureException getException() {
100    if (subprocStack != null) {
101      for (Procedure<TEnvironment> proc : subprocStack) {
102        if (proc.hasException()) {
103          return proc.getException();
104        }
105      }
106    }
107    return null;
108  }
109
110  /**
111   * Called by the ProcedureExecutor to mark the procedure step as running.
112   */
113  protected synchronized boolean acquire(Procedure<TEnvironment> proc) {
114    if (state != State.RUNNING) {
115      return false;
116    }
117
118    running++;
119    return true;
120  }
121
122  /**
123   * Called by the ProcedureExecutor to mark the procedure step as finished.
124   */
125  protected synchronized void release(Procedure<TEnvironment> proc) {
126    running--;
127  }
128
129  protected synchronized void abort() {
130    if (state == State.RUNNING) {
131      state = State.FAILED;
132    }
133  }
134
135  /**
136   * Called by the ProcedureExecutor after the procedure step is completed, to add the step to the
137   * rollback list (or procedure stack)
138   */
139  protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) {
140    if (proc.isFailed()) {
141      state = State.FAILED;
142    }
143    if (subprocStack == null) {
144      subprocStack = new ArrayList<>();
145    }
146    proc.addStackIndex(subprocStack.size());
147    LOG.trace("Add procedure {} as the {}th rollback step", proc, subprocStack.size());
148    subprocStack.add(proc);
149  }
150
151  protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) {
152    if (!proc.hasParent()) {
153      return;
154    }
155    if (subprocs == null) {
156      subprocs = new HashSet<>();
157    }
158    subprocs.add(proc);
159  }
160
161  /**
162   * Called on store load by the ProcedureExecutor to load part of the stack. Each procedure has its
163   * own stack-positions. Which means we have to write to the store only the Procedure we executed,
164   * and nothing else. on load we recreate the full stack by aggregating each procedure
165   * stack-positions.
166   */
167  protected synchronized void loadStack(Procedure<TEnvironment> proc) {
168    addSubProcedure(proc);
169    int[] stackIndexes = proc.getStackIndexes();
170    if (stackIndexes != null) {
171      if (subprocStack == null) {
172        subprocStack = new ArrayList<>();
173      }
174      int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size();
175      if (diff > 0) {
176        subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
177        while (diff-- > 0) {
178          subprocStack.add(null);
179        }
180      }
181      for (int i = 0; i < stackIndexes.length; ++i) {
182        subprocStack.set(stackIndexes[i], proc);
183      }
184    }
185    if (proc.getState() == ProcedureState.ROLLEDBACK) {
186      state = State.ROLLINGBACK;
187    } else if (proc.isFailed()) {
188      state = State.FAILED;
189    }
190  }
191
192  /**
193   * Called on store load by the ProcedureExecutor to validate the procedure stack.
194   */
195  protected synchronized boolean isValid() {
196    if (subprocStack != null) {
197      for (Procedure<TEnvironment> proc : subprocStack) {
198        if (proc == null) {
199          return false;
200        }
201      }
202    }
203    return true;
204  }
205}