001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.yetus.audience.InterfaceStability; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData; 032 033/** 034 * Procedure described by a series of steps. 035 * 036 * <p>The procedure implementor must have an enum of 'states', describing 037 * the various step of the procedure. 038 * Once the procedure is running, the procedure-framework will call executeFromState() 039 * using the 'state' provided by the user. The first call to executeFromState() 040 * will be performed with 'state = null'. The implementor can jump between 041 * states using setNextState(MyStateEnum.ordinal()). 042 * The rollback will call rollbackState() for each state that was executed, in reverse order. 043 */ 044@InterfaceAudience.Private 045@InterfaceStability.Evolving 046public abstract class StateMachineProcedure<TEnvironment, TState> 047 extends Procedure<TEnvironment> { 048 private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class); 049 050 private static final int EOF_STATE = Integer.MIN_VALUE; 051 052 private final AtomicBoolean aborted = new AtomicBoolean(false); 053 054 private Flow stateFlow = Flow.HAS_MORE_STATE; 055 protected int stateCount = 0; 056 private int[] states = null; 057 058 private List<Procedure<TEnvironment>> subProcList = null; 059 060 protected final int getCycles() { 061 return cycles; 062 } 063 064 /** 065 * Cycles on same state. Good for figuring if we are stuck. 066 */ 067 private int cycles = 0; 068 069 /** 070 * Ordinal of the previous state. So we can tell if we are progressing or not. 071 */ 072 private int previousState; 073 074 public enum Flow { 075 HAS_MORE_STATE, 076 NO_MORE_STATE, 077 } 078 079 /** 080 * called to perform a single step of the specified 'state' of the procedure 081 * @param state state to execute 082 * @return Flow.NO_MORE_STATE if the procedure is completed, 083 * Flow.HAS_MORE_STATE if there is another step. 084 */ 085 protected abstract Flow executeFromState(TEnvironment env, TState state) 086 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; 087 088 /** 089 * called to perform the rollback of the specified state 090 * @param state state to rollback 091 * @throws IOException temporary failure, the rollback will retry later 092 */ 093 protected abstract void rollbackState(TEnvironment env, TState state) 094 throws IOException, InterruptedException; 095 096 /** 097 * Convert an ordinal (or state id) to an Enum (or more descriptive) state object. 098 * @param stateId the ordinal() of the state enum (or state id) 099 * @return the state enum object 100 */ 101 protected abstract TState getState(int stateId); 102 103 /** 104 * Convert the Enum (or more descriptive) state object to an ordinal (or state id). 105 * @param state the state enum object 106 * @return stateId the ordinal() of the state enum (or state id) 107 */ 108 protected abstract int getStateId(TState state); 109 110 /** 111 * Return the initial state object that will be used for the first call to executeFromState(). 112 * @return the initial state enum object 113 */ 114 protected abstract TState getInitialState(); 115 116 /** 117 * Set the next state for the procedure. 118 * @param state the state enum object 119 */ 120 protected void setNextState(final TState state) { 121 setNextState(getStateId(state)); 122 failIfAborted(); 123 } 124 125 /** 126 * By default, the executor will try ro run all the steps of the procedure start to finish. 127 * Return true to make the executor yield between execution steps to 128 * give other procedures time to run their steps. 129 * @param state the state we are going to execute next. 130 * @return Return true if the executor should yield before the execution of the specified step. 131 * Defaults to return false. 132 */ 133 protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) { 134 return false; 135 } 136 137 /** 138 * Add a child procedure to execute 139 * @param subProcedure the child procedure 140 */ 141 protected <T extends Procedure<TEnvironment>> void addChildProcedure( 142 @SuppressWarnings("unchecked") T... subProcedure) { 143 if (subProcedure == null) { 144 return; 145 } 146 final int len = subProcedure.length; 147 if (len == 0) { 148 return; 149 } 150 if (subProcList == null) { 151 subProcList = new ArrayList<>(len); 152 } 153 for (int i = 0; i < len; ++i) { 154 Procedure<TEnvironment> proc = subProcedure[i]; 155 if (!proc.hasOwner()) { 156 proc.setOwner(getOwner()); 157 } 158 159 subProcList.add(proc); 160 } 161 } 162 163 @Override 164 protected Procedure[] execute(final TEnvironment env) 165 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 166 updateTimestamp(); 167 try { 168 failIfAborted(); 169 170 if (!hasMoreState() || isFailed()) { 171 return null; 172 } 173 174 TState state = getCurrentState(); 175 if (stateCount == 0) { 176 setNextState(getStateId(state)); 177 } 178 179 if (LOG.isTraceEnabled()) { 180 LOG.trace(state + " " + this + "; cycles=" + this.cycles); 181 } 182 // Keep running count of cycles 183 if (getStateId(state) != this.previousState) { 184 this.previousState = getStateId(state); 185 this.cycles = 0; 186 } else { 187 this.cycles++; 188 } 189 190 LOG.trace("{}", this); 191 stateFlow = executeFromState(env, state); 192 if (!hasMoreState()) { 193 setNextState(EOF_STATE); 194 } 195 196 if (subProcList != null && !subProcList.isEmpty()) { 197 Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); 198 subProcList = null; 199 return subProcedures; 200 } 201 return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; 202 } finally { 203 updateTimestamp(); 204 } 205 } 206 207 @Override 208 protected void rollback(final TEnvironment env) 209 throws IOException, InterruptedException { 210 if (isEofState()) { 211 stateCount--; 212 } 213 214 try { 215 updateTimestamp(); 216 rollbackState(env, getCurrentState()); 217 } finally { 218 stateCount--; 219 updateTimestamp(); 220 } 221 } 222 223 protected boolean isEofState() { 224 return stateCount > 0 && states[stateCount-1] == EOF_STATE; 225 } 226 227 @Override 228 protected boolean abort(final TEnvironment env) { 229 LOG.debug("Abort requested for {}", this); 230 if (!hasMoreState()) { 231 LOG.warn("Ignore abort request on {} because it has already been finished", this); 232 return false; 233 } 234 if (!isRollbackSupported(getCurrentState())) { 235 LOG.warn("Ignore abort request on {} because it does not support rollback", this); 236 return false; 237 } 238 aborted.set(true); 239 return true; 240 } 241 242 /** 243 * If procedure has more states then abort it otherwise procedure is finished and abort can be 244 * ignored. 245 */ 246 protected final void failIfAborted() { 247 if (aborted.get()) { 248 if (hasMoreState()) { 249 setAbortFailure(getClass().getSimpleName(), "abort requested"); 250 } else { 251 LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this); 252 } 253 } 254 } 255 256 /** 257 * Used by the default implementation of abort() to know if the current state can be aborted 258 * and rollback can be triggered. 259 */ 260 protected boolean isRollbackSupported(final TState state) { 261 return false; 262 } 263 264 @Override 265 protected boolean isYieldAfterExecutionStep(final TEnvironment env) { 266 return isYieldBeforeExecuteFromState(env, getCurrentState()); 267 } 268 269 private boolean hasMoreState() { 270 return stateFlow != Flow.NO_MORE_STATE; 271 } 272 273 protected TState getCurrentState() { 274 return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState(); 275 } 276 277 /** 278 * This method is used from test code as it cannot be assumed that state transition will happen 279 * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in 280 * future. 281 */ 282 public int getCurrentStateId() { 283 return getStateId(getCurrentState()); 284 } 285 286 /** 287 * Set the next state for the procedure. 288 * @param stateId the ordinal() of the state enum (or state id) 289 */ 290 private void setNextState(final int stateId) { 291 if (states == null || states.length == stateCount) { 292 int newCapacity = stateCount + 8; 293 if (states != null) { 294 states = Arrays.copyOf(states, newCapacity); 295 } else { 296 states = new int[newCapacity]; 297 } 298 } 299 states[stateCount++] = stateId; 300 } 301 302 @Override 303 protected void toStringState(StringBuilder builder) { 304 super.toStringState(builder); 305 if (!isFinished() && !isEofState() && getCurrentState() != null) { 306 builder.append(":").append(getCurrentState()); 307 } 308 } 309 310 @Override 311 protected void serializeStateData(ProcedureStateSerializer serializer) 312 throws IOException { 313 StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder(); 314 for (int i = 0; i < stateCount; ++i) { 315 data.addState(states[i]); 316 } 317 serializer.serialize(data.build()); 318 } 319 320 @Override 321 protected void deserializeStateData(ProcedureStateSerializer serializer) 322 throws IOException { 323 StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class); 324 stateCount = data.getStateCount(); 325 if (stateCount > 0) { 326 states = new int[stateCount]; 327 for (int i = 0; i < stateCount; ++i) { 328 states[i] = data.getState(i); 329 } 330 if (isEofState()) { 331 stateFlow = Flow.NO_MORE_STATE; 332 } 333 } else { 334 states = null; 335 } 336 } 337}