001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.yetus.audience.InterfaceStability; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData; 031 032/** 033 * Procedure described by a series of steps. 034 * <p> 035 * The procedure implementor must have an enum of 'states', describing the various step of the 036 * procedure. Once the procedure is running, the procedure-framework will call executeFromState() 037 * using the 'state' provided by the user. The first call to executeFromState() will be performed 038 * with 'state = null'. The implementor can jump between states using 039 * setNextState(MyStateEnum.ordinal()). The rollback will call rollbackState() for each state that 040 * was executed, in reverse order. 041 */ 042@InterfaceAudience.Private 043@InterfaceStability.Evolving 044public abstract class StateMachineProcedure<TEnvironment, TState> extends Procedure<TEnvironment> { 045 private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class); 046 047 private static final int EOF_STATE = Integer.MIN_VALUE; 048 049 private final AtomicBoolean aborted = new AtomicBoolean(false); 050 051 private Flow stateFlow = Flow.HAS_MORE_STATE; 052 protected int stateCount = 0; 053 private int[] states = null; 054 055 private List<Procedure<TEnvironment>> subProcList = null; 056 057 protected final int getCycles() { 058 return cycles; 059 } 060 061 /** 062 * Cycles on same state. Good for figuring if we are stuck. 063 */ 064 private int cycles = 0; 065 066 /** 067 * Ordinal of the previous state. So we can tell if we are progressing or not. 068 */ 069 private int previousState; 070 071 public enum Flow { 072 HAS_MORE_STATE, 073 NO_MORE_STATE, 074 } 075 076 /** 077 * called to perform a single step of the specified 'state' of the procedure 078 * @param state state to execute 079 * @return Flow.NO_MORE_STATE if the procedure is completed, Flow.HAS_MORE_STATE if there is 080 * another step. 081 */ 082 protected abstract Flow executeFromState(TEnvironment env, TState state) 083 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; 084 085 /** 086 * called to perform the rollback of the specified state 087 * @param state state to rollback 088 * @throws IOException temporary failure, the rollback will retry later 089 */ 090 protected abstract void rollbackState(TEnvironment env, TState state) 091 throws IOException, InterruptedException; 092 093 /** 094 * Convert an ordinal (or state id) to an Enum (or more descriptive) state object. 095 * @param stateId the ordinal() of the state enum (or state id) 096 * @return the state enum object 097 */ 098 protected abstract TState getState(int stateId); 099 100 /** 101 * Convert the Enum (or more descriptive) state object to an ordinal (or state id). 102 * @param state the state enum object 103 * @return stateId the ordinal() of the state enum (or state id) 104 */ 105 protected abstract int getStateId(TState state); 106 107 /** 108 * Return the initial state object that will be used for the first call to executeFromState(). 109 * @return the initial state enum object 110 */ 111 protected abstract TState getInitialState(); 112 113 /** 114 * Set the next state for the procedure. 115 * @param state the state enum object 116 */ 117 protected void setNextState(final TState state) { 118 setNextState(getStateId(state)); 119 failIfAborted(); 120 } 121 122 /** 123 * By default, the executor will try ro run all the steps of the procedure start to finish. Return 124 * true to make the executor yield between execution steps to give other procedures time to run 125 * their steps. 126 * @param state the state we are going to execute next. 127 * @return Return true if the executor should yield before the execution of the specified step. 128 * Defaults to return false. 129 */ 130 protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) { 131 return false; 132 } 133 134 /** 135 * Add a child procedure to execute 136 * @param subProcedure the child procedure 137 */ 138 protected <T extends Procedure<TEnvironment>> void 139 addChildProcedure(@SuppressWarnings("unchecked") T... subProcedure) { 140 if (subProcedure == null) { 141 return; 142 } 143 final int len = subProcedure.length; 144 if (len == 0) { 145 return; 146 } 147 if (subProcList == null) { 148 subProcList = new ArrayList<>(len); 149 } 150 for (int i = 0; i < len; ++i) { 151 Procedure<TEnvironment> proc = subProcedure[i]; 152 if (!proc.hasOwner()) { 153 proc.setOwner(getOwner()); 154 } 155 156 subProcList.add(proc); 157 } 158 } 159 160 @Override 161 protected Procedure[] execute(final TEnvironment env) 162 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 163 updateTimestamp(); 164 try { 165 failIfAborted(); 166 167 if (!hasMoreState() || isFailed()) { 168 return null; 169 } 170 171 TState state = getCurrentState(); 172 if (stateCount == 0) { 173 setNextState(getStateId(state)); 174 } 175 176 if (LOG.isTraceEnabled()) { 177 LOG.trace(state + " " + this + "; cycles=" + this.cycles); 178 } 179 // Keep running count of cycles 180 if (getStateId(state) != this.previousState) { 181 this.previousState = getStateId(state); 182 this.cycles = 0; 183 } else { 184 this.cycles++; 185 } 186 187 LOG.trace("{}", this); 188 stateFlow = executeFromState(env, state); 189 if (!hasMoreState()) { 190 setNextState(EOF_STATE); 191 } 192 193 if (subProcList != null && !subProcList.isEmpty()) { 194 Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); 195 subProcList = null; 196 return subProcedures; 197 } 198 return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] { this }; 199 } finally { 200 updateTimestamp(); 201 } 202 } 203 204 @Override 205 protected void rollback(final TEnvironment env) throws IOException, InterruptedException { 206 if (isEofState()) { 207 stateCount--; 208 } 209 210 try { 211 updateTimestamp(); 212 rollbackState(env, getCurrentState()); 213 } finally { 214 stateCount--; 215 updateTimestamp(); 216 } 217 } 218 219 protected boolean isEofState() { 220 return stateCount > 0 && states[stateCount - 1] == EOF_STATE; 221 } 222 223 @Override 224 protected boolean abort(final TEnvironment env) { 225 LOG.debug("Abort requested for {}", this); 226 if (!hasMoreState()) { 227 LOG.warn("Ignore abort request on {} because it has already been finished", this); 228 return false; 229 } 230 if (!isRollbackSupported(getCurrentState())) { 231 LOG.warn("Ignore abort request on {} because it does not support rollback", this); 232 return false; 233 } 234 aborted.set(true); 235 return true; 236 } 237 238 /** 239 * If procedure has more states then abort it otherwise procedure is finished and abort can be 240 * ignored. 241 */ 242 protected final void failIfAborted() { 243 if (aborted.get()) { 244 if (hasMoreState()) { 245 setAbortFailure(getClass().getSimpleName(), "abort requested"); 246 } else { 247 LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this); 248 } 249 } 250 } 251 252 @Override 253 protected final boolean isRollbackSupported() { 254 return isRollbackSupported(getCurrentState()); 255 } 256 257 /** 258 * Used by the default implementation of abort() to know if the current state can be aborted and 259 * rollback can be triggered. 260 */ 261 protected boolean isRollbackSupported(final TState state) { 262 return false; 263 } 264 265 @Override 266 protected boolean isYieldAfterExecutionStep(final TEnvironment env) { 267 return isYieldBeforeExecuteFromState(env, getCurrentState()); 268 } 269 270 private boolean hasMoreState() { 271 return stateFlow != Flow.NO_MORE_STATE; 272 } 273 274 protected TState getCurrentState() { 275 return stateCount > 0 ? getState(states[stateCount - 1]) : getInitialState(); 276 } 277 278 /** 279 * This method is used from test code as it cannot be assumed that state transition will happen 280 * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in 281 * future. 282 */ 283 public int getCurrentStateId() { 284 return getStateId(getCurrentState()); 285 } 286 287 /** 288 * Set the next state for the procedure. 289 * @param stateId the ordinal() of the state enum (or state id) 290 */ 291 private void setNextState(final int stateId) { 292 if (states == null || states.length == stateCount) { 293 int newCapacity = stateCount + 8; 294 if (states != null) { 295 states = Arrays.copyOf(states, newCapacity); 296 } else { 297 states = new int[newCapacity]; 298 } 299 } 300 states[stateCount++] = stateId; 301 } 302 303 @Override 304 protected void toStringState(StringBuilder builder) { 305 super.toStringState(builder); 306 if (!isFinished() && !isEofState() && getCurrentState() != null) { 307 builder.append(":").append(getCurrentState()); 308 } 309 } 310 311 @Override 312 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 313 StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder(); 314 for (int i = 0; i < stateCount; ++i) { 315 data.addState(states[i]); 316 } 317 serializer.serialize(data.build()); 318 } 319 320 @Override 321 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 322 StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class); 323 stateCount = data.getStateCount(); 324 if (stateCount > 0) { 325 states = new int[stateCount]; 326 for (int i = 0; i < stateCount; ++i) { 327 states[i] = data.getState(i); 328 } 329 if (isEofState()) { 330 stateFlow = Flow.NO_MORE_STATE; 331 } 332 } else { 333 states = null; 334 } 335 } 336}