001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.List; 025import java.util.concurrent.atomic.AtomicBoolean; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.yetus.audience.InterfaceStability; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 033 034import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData; 035 036/** 037 * Procedure described by a series of steps. 038 * 039 * <p>The procedure implementor must have an enum of 'states', describing 040 * the various step of the procedure. 041 * Once the procedure is running, the procedure-framework will call executeFromState() 042 * using the 'state' provided by the user. The first call to executeFromState() 043 * will be performed with 'state = null'. The implementor can jump between 044 * states using setNextState(MyStateEnum.ordinal()). 045 * The rollback will call rollbackState() for each state that was executed, in reverse order. 046 */ 047@InterfaceAudience.Private 048@InterfaceStability.Evolving 049public abstract class StateMachineProcedure<TEnvironment, TState> 050 extends Procedure<TEnvironment> { 051 private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class); 052 053 private static final int EOF_STATE = Integer.MIN_VALUE; 054 055 private final AtomicBoolean aborted = new AtomicBoolean(false); 056 057 private Flow stateFlow = Flow.HAS_MORE_STATE; 058 protected int stateCount = 0; 059 private int[] states = null; 060 061 private List<Procedure<TEnvironment>> subProcList = null; 062 063 protected final int getCycles() { 064 return cycles; 065 } 066 067 /** 068 * Cycles on same state. Good for figuring if we are stuck. 069 */ 070 private int cycles = 0; 071 072 /** 073 * Ordinal of the previous state. So we can tell if we are progressing or not. 074 */ 075 private int previousState; 076 077 @VisibleForTesting 078 public enum Flow { 079 HAS_MORE_STATE, 080 NO_MORE_STATE, 081 } 082 083 /** 084 * called to perform a single step of the specified 'state' of the procedure 085 * @param state state to execute 086 * @return Flow.NO_MORE_STATE if the procedure is completed, 087 * Flow.HAS_MORE_STATE if there is another step. 088 */ 089 protected abstract Flow executeFromState(TEnvironment env, TState state) 090 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; 091 092 /** 093 * called to perform the rollback of the specified state 094 * @param state state to rollback 095 * @throws IOException temporary failure, the rollback will retry later 096 */ 097 protected abstract void rollbackState(TEnvironment env, TState state) 098 throws IOException, InterruptedException; 099 100 /** 101 * Convert an ordinal (or state id) to an Enum (or more descriptive) state object. 102 * @param stateId the ordinal() of the state enum (or state id) 103 * @return the state enum object 104 */ 105 protected abstract TState getState(int stateId); 106 107 /** 108 * Convert the Enum (or more descriptive) state object to an ordinal (or state id). 109 * @param state the state enum object 110 * @return stateId the ordinal() of the state enum (or state id) 111 */ 112 protected abstract int getStateId(TState state); 113 114 /** 115 * Return the initial state object that will be used for the first call to executeFromState(). 116 * @return the initial state enum object 117 */ 118 protected abstract TState getInitialState(); 119 120 /** 121 * Set the next state for the procedure. 122 * @param state the state enum object 123 */ 124 protected void setNextState(final TState state) { 125 setNextState(getStateId(state)); 126 failIfAborted(); 127 } 128 129 /** 130 * By default, the executor will try ro run all the steps of the procedure start to finish. 131 * Return true to make the executor yield between execution steps to 132 * give other procedures time to run their steps. 133 * @param state the state we are going to execute next. 134 * @return Return true if the executor should yield before the execution of the specified step. 135 * Defaults to return false. 136 */ 137 protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) { 138 return false; 139 } 140 141 /** 142 * Add a child procedure to execute 143 * @param subProcedure the child procedure 144 */ 145 protected void addChildProcedure(Procedure<TEnvironment>... subProcedure) { 146 if (subProcedure == null) return; 147 final int len = subProcedure.length; 148 if (len == 0) return; 149 if (subProcList == null) { 150 subProcList = new ArrayList<>(len); 151 } 152 for (int i = 0; i < len; ++i) { 153 Procedure<TEnvironment> proc = subProcedure[i]; 154 if (!proc.hasOwner()) proc.setOwner(getOwner()); 155 subProcList.add(proc); 156 } 157 } 158 159 @Override 160 protected Procedure[] execute(final TEnvironment env) 161 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 162 updateTimestamp(); 163 try { 164 failIfAborted(); 165 166 if (!hasMoreState() || isFailed()) return null; 167 TState state = getCurrentState(); 168 if (stateCount == 0) { 169 setNextState(getStateId(state)); 170 } 171 172 if (LOG.isTraceEnabled()) { 173 LOG.trace(state + " " + this + "; cycles=" + this.cycles); 174 } 175 // Keep running count of cycles 176 if (getStateId(state) != this.previousState) { 177 this.previousState = getStateId(state); 178 this.cycles = 0; 179 } else { 180 this.cycles++; 181 } 182 183 LOG.trace("{}", toString()); 184 stateFlow = executeFromState(env, state); 185 if (!hasMoreState()) setNextState(EOF_STATE); 186 if (subProcList != null && !subProcList.isEmpty()) { 187 Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); 188 subProcList = null; 189 return subProcedures; 190 } 191 return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; 192 } finally { 193 updateTimestamp(); 194 } 195 } 196 197 @Override 198 protected void rollback(final TEnvironment env) 199 throws IOException, InterruptedException { 200 if (isEofState()) stateCount--; 201 try { 202 updateTimestamp(); 203 rollbackState(env, getCurrentState()); 204 } finally { 205 stateCount--; 206 updateTimestamp(); 207 } 208 } 209 210 protected boolean isEofState() { 211 return stateCount > 0 && states[stateCount-1] == EOF_STATE; 212 } 213 214 @Override 215 protected boolean abort(final TEnvironment env) { 216 LOG.debug("Abort requested for {}", this); 217 if (!hasMoreState()) { 218 LOG.warn("Ignore abort request on {} because it has already been finished", this); 219 return false; 220 } 221 if (!isRollbackSupported(getCurrentState())) { 222 LOG.warn("Ignore abort request on {} because it does not support rollback", this); 223 return false; 224 } 225 aborted.set(true); 226 return true; 227 } 228 229 /** 230 * If procedure has more states then abort it otherwise procedure is finished and abort can be 231 * ignored. 232 */ 233 protected final void failIfAborted() { 234 if (aborted.get()) { 235 if (hasMoreState()) { 236 setAbortFailure(getClass().getSimpleName(), "abort requested"); 237 } else { 238 LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this); 239 } 240 } 241 } 242 243 /** 244 * Used by the default implementation of abort() to know if the current state can be aborted 245 * and rollback can be triggered. 246 */ 247 protected boolean isRollbackSupported(final TState state) { 248 return false; 249 } 250 251 @Override 252 protected boolean isYieldAfterExecutionStep(final TEnvironment env) { 253 return isYieldBeforeExecuteFromState(env, getCurrentState()); 254 } 255 256 private boolean hasMoreState() { 257 return stateFlow != Flow.NO_MORE_STATE; 258 } 259 260 protected TState getCurrentState() { 261 return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState(); 262 } 263 264 /** 265 * This method is used from test code as it cannot be assumed that state transition will happen 266 * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in 267 * future. 268 */ 269 @VisibleForTesting 270 public int getCurrentStateId() { 271 return getStateId(getCurrentState()); 272 } 273 274 /** 275 * Set the next state for the procedure. 276 * @param stateId the ordinal() of the state enum (or state id) 277 */ 278 private void setNextState(final int stateId) { 279 if (states == null || states.length == stateCount) { 280 int newCapacity = stateCount + 8; 281 if (states != null) { 282 states = Arrays.copyOf(states, newCapacity); 283 } else { 284 states = new int[newCapacity]; 285 } 286 } 287 states[stateCount++] = stateId; 288 } 289 290 @Override 291 protected void toStringState(StringBuilder builder) { 292 super.toStringState(builder); 293 if (!isFinished() && !isEofState() && getCurrentState() != null) { 294 builder.append(":").append(getCurrentState()); 295 } 296 } 297 298 @Override 299 protected void serializeStateData(ProcedureStateSerializer serializer) 300 throws IOException { 301 StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder(); 302 for (int i = 0; i < stateCount; ++i) { 303 data.addState(states[i]); 304 } 305 serializer.serialize(data.build()); 306 } 307 308 @Override 309 protected void deserializeStateData(ProcedureStateSerializer serializer) 310 throws IOException { 311 StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class); 312 stateCount = data.getStateCount(); 313 if (stateCount > 0) { 314 states = new int[stateCount]; 315 for (int i = 0; i < stateCount; ++i) { 316 states[i] = data.getState(i); 317 } 318 if (isEofState()) { 319 stateFlow = Flow.NO_MORE_STATE; 320 } 321 } else { 322 states = null; 323 } 324 } 325}