001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.util.ArrayList; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.yetus.audience.InterfaceStability; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 031 032/** 033 * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure". 034 * A "Root Procedure" is a Procedure without parent, each subprocedure will be 035 * added to the "Root Procedure" stack (or rollback-stack). 036 * 037 * RootProcedureState is used and managed only by the ProcedureExecutor. 038 * Long rootProcId = getRootProcedureId(proc); 039 * rollbackStack.get(rootProcId).acquire(proc) 040 * rollbackStack.get(rootProcId).release(proc) 041 * ... 042 */ 043@InterfaceAudience.Private 044@InterfaceStability.Evolving 045class RootProcedureState<TEnvironment> { 046 047 private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class); 048 049 private enum State { 050 RUNNING, // The Procedure is running or ready to run 051 FAILED, // The Procedure failed, waiting for the rollback executing 052 ROLLINGBACK, // The Procedure failed and the execution was rolledback 053 } 054 055 private Set<Procedure<TEnvironment>> subprocs = null; 056 private ArrayList<Procedure<TEnvironment>> subprocStack = null; 057 private State state = State.RUNNING; 058 private int running = 0; 059 060 public synchronized boolean isFailed() { 061 switch (state) { 062 case ROLLINGBACK: 063 case FAILED: 064 return true; 065 default: 066 break; 067 } 068 return false; 069 } 070 071 public synchronized boolean isRollingback() { 072 return state == State.ROLLINGBACK; 073 } 074 075 /** 076 * Called by the ProcedureExecutor to mark rollback execution 077 */ 078 protected synchronized boolean setRollback() { 079 if (running == 0 && state == State.FAILED) { 080 state = State.ROLLINGBACK; 081 return true; 082 } 083 return false; 084 } 085 086 /** 087 * Called by the ProcedureExecutor to mark rollback execution 088 */ 089 protected synchronized void unsetRollback() { 090 assert state == State.ROLLINGBACK; 091 state = State.FAILED; 092 } 093 094 protected synchronized long[] getSubprocedureIds() { 095 if (subprocs == null) { 096 return null; 097 } 098 return subprocs.stream().mapToLong(Procedure::getProcId).toArray(); 099 } 100 101 protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() { 102 return subprocStack; 103 } 104 105 protected synchronized RemoteProcedureException getException() { 106 if (subprocStack != null) { 107 for (Procedure<TEnvironment> proc: subprocStack) { 108 if (proc.hasException()) { 109 return proc.getException(); 110 } 111 } 112 } 113 return null; 114 } 115 116 /** 117 * Called by the ProcedureExecutor to mark the procedure step as running. 118 */ 119 protected synchronized boolean acquire(Procedure<TEnvironment> proc) { 120 if (state != State.RUNNING) { 121 return false; 122 } 123 124 running++; 125 return true; 126 } 127 128 /** 129 * Called by the ProcedureExecutor to mark the procedure step as finished. 130 */ 131 protected synchronized void release(Procedure<TEnvironment> proc) { 132 running--; 133 } 134 135 protected synchronized void abort() { 136 if (state == State.RUNNING) { 137 state = State.FAILED; 138 } 139 } 140 141 /** 142 * Called by the ProcedureExecutor after the procedure step is completed, 143 * to add the step to the rollback list (or procedure stack) 144 */ 145 protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) { 146 if (proc.isFailed()) { 147 state = State.FAILED; 148 } 149 if (subprocStack == null) { 150 subprocStack = new ArrayList<>(); 151 } 152 proc.addStackIndex(subprocStack.size()); 153 LOG.debug("Add procedure {} as the {}th rollback step", proc, subprocStack.size()); 154 subprocStack.add(proc); 155 } 156 157 protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) { 158 if (!proc.hasParent()) { 159 return; 160 } 161 if (subprocs == null) { 162 subprocs = new HashSet<>(); 163 } 164 subprocs.add(proc); 165 } 166 167 /** 168 * Called on store load by the ProcedureExecutor to load part of the stack. 169 * 170 * Each procedure has its own stack-positions. Which means we have to write 171 * to the store only the Procedure we executed, and nothing else. 172 * on load we recreate the full stack by aggregating each procedure stack-positions. 173 */ 174 protected synchronized void loadStack(Procedure<TEnvironment> proc) { 175 addSubProcedure(proc); 176 int[] stackIndexes = proc.getStackIndexes(); 177 if (stackIndexes != null) { 178 if (subprocStack == null) { 179 subprocStack = new ArrayList<>(); 180 } 181 int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size(); 182 if (diff > 0) { 183 subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]); 184 while (diff-- > 0) subprocStack.add(null); 185 } 186 for (int i = 0; i < stackIndexes.length; ++i) { 187 subprocStack.set(stackIndexes[i], proc); 188 } 189 } 190 if (proc.getState() == ProcedureState.ROLLEDBACK) { 191 state = State.ROLLINGBACK; 192 } else if (proc.isFailed()) { 193 state = State.FAILED; 194 } 195 } 196 197 /** 198 * Called on store load by the ProcedureExecutor to validate the procedure stack. 199 */ 200 protected synchronized boolean isValid() { 201 if (subprocStack != null) { 202 for (Procedure<TEnvironment> proc : subprocStack) { 203 if (proc == null) { 204 return false; 205 } 206 } 207 } 208 return true; 209 } 210}