001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.List; 025import java.util.concurrent.atomic.AtomicBoolean; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.yetus.audience.InterfaceStability; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 033 034import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData; 035 036/** 037 * Procedure described by a series of steps. 038 * 039 * <p>The procedure implementor must have an enum of 'states', describing 040 * the various step of the procedure. 041 * Once the procedure is running, the procedure-framework will call executeFromState() 042 * using the 'state' provided by the user. The first call to executeFromState() 043 * will be performed with 'state = null'. The implementor can jump between 044 * states using setNextState(MyStateEnum.ordinal()). 045 * The rollback will call rollbackState() for each state that was executed, in reverse order. 046 */ 047@InterfaceAudience.Private 048@InterfaceStability.Evolving 049public abstract class StateMachineProcedure<TEnvironment, TState> 050 extends Procedure<TEnvironment> { 051 private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class); 052 053 private static final int EOF_STATE = Integer.MIN_VALUE; 054 055 private final AtomicBoolean aborted = new AtomicBoolean(false); 056 057 private Flow stateFlow = Flow.HAS_MORE_STATE; 058 protected int stateCount = 0; 059 private int[] states = null; 060 061 private List<Procedure<TEnvironment>> subProcList = null; 062 063 protected final int getCycles() { 064 return cycles; 065 } 066 067 /** 068 * Cycles on same state. Good for figuring if we are stuck. 069 */ 070 private int cycles = 0; 071 072 /** 073 * Ordinal of the previous state. So we can tell if we are progressing or not. 074 */ 075 private int previousState; 076 077 @VisibleForTesting 078 public enum Flow { 079 HAS_MORE_STATE, 080 NO_MORE_STATE, 081 } 082 083 /** 084 * called to perform a single step of the specified 'state' of the procedure 085 * @param state state to execute 086 * @return Flow.NO_MORE_STATE if the procedure is completed, 087 * Flow.HAS_MORE_STATE if there is another step. 088 */ 089 protected abstract Flow executeFromState(TEnvironment env, TState state) 090 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; 091 092 /** 093 * called to perform the rollback of the specified state 094 * @param state state to rollback 095 * @throws IOException temporary failure, the rollback will retry later 096 */ 097 protected abstract void rollbackState(TEnvironment env, TState state) 098 throws IOException, InterruptedException; 099 100 /** 101 * Convert an ordinal (or state id) to an Enum (or more descriptive) state object. 102 * @param stateId the ordinal() of the state enum (or state id) 103 * @return the state enum object 104 */ 105 protected abstract TState getState(int stateId); 106 107 /** 108 * Convert the Enum (or more descriptive) state object to an ordinal (or state id). 109 * @param state the state enum object 110 * @return stateId the ordinal() of the state enum (or state id) 111 */ 112 protected abstract int getStateId(TState state); 113 114 /** 115 * Return the initial state object that will be used for the first call to executeFromState(). 116 * @return the initial state enum object 117 */ 118 protected abstract TState getInitialState(); 119 120 /** 121 * Set the next state for the procedure. 122 * @param state the state enum object 123 */ 124 protected void setNextState(final TState state) { 125 setNextState(getStateId(state)); 126 failIfAborted(); 127 } 128 129 /** 130 * By default, the executor will try ro run all the steps of the procedure start to finish. 131 * Return true to make the executor yield between execution steps to 132 * give other procedures time to run their steps. 133 * @param state the state we are going to execute next. 134 * @return Return true if the executor should yield before the execution of the specified step. 135 * Defaults to return false. 136 */ 137 protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) { 138 return false; 139 } 140 141 /** 142 * Add a child procedure to execute 143 * @param subProcedure the child procedure 144 */ 145 protected <T extends Procedure<TEnvironment>> void addChildProcedure( 146 @SuppressWarnings("unchecked") T... subProcedure) { 147 if (subProcedure == null) { 148 return; 149 } 150 final int len = subProcedure.length; 151 if (len == 0) { 152 return; 153 } 154 if (subProcList == null) { 155 subProcList = new ArrayList<>(len); 156 } 157 for (int i = 0; i < len; ++i) { 158 Procedure<TEnvironment> proc = subProcedure[i]; 159 if (!proc.hasOwner()) proc.setOwner(getOwner()); 160 subProcList.add(proc); 161 } 162 } 163 164 @Override 165 protected Procedure[] execute(final TEnvironment env) 166 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 167 updateTimestamp(); 168 try { 169 failIfAborted(); 170 171 if (!hasMoreState() || isFailed()) return null; 172 TState state = getCurrentState(); 173 if (stateCount == 0) { 174 setNextState(getStateId(state)); 175 } 176 177 if (LOG.isTraceEnabled()) { 178 LOG.trace(state + " " + this + "; cycles=" + this.cycles); 179 } 180 // Keep running count of cycles 181 if (getStateId(state) != this.previousState) { 182 this.previousState = getStateId(state); 183 this.cycles = 0; 184 } else { 185 this.cycles++; 186 } 187 188 LOG.trace("{}", toString()); 189 stateFlow = executeFromState(env, state); 190 if (!hasMoreState()) setNextState(EOF_STATE); 191 if (subProcList != null && !subProcList.isEmpty()) { 192 Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); 193 subProcList = null; 194 return subProcedures; 195 } 196 return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; 197 } finally { 198 updateTimestamp(); 199 } 200 } 201 202 @Override 203 protected void rollback(final TEnvironment env) 204 throws IOException, InterruptedException { 205 if (isEofState()) stateCount--; 206 try { 207 updateTimestamp(); 208 rollbackState(env, getCurrentState()); 209 } finally { 210 stateCount--; 211 updateTimestamp(); 212 } 213 } 214 215 protected boolean isEofState() { 216 return stateCount > 0 && states[stateCount-1] == EOF_STATE; 217 } 218 219 @Override 220 protected boolean abort(final TEnvironment env) { 221 LOG.debug("Abort requested for {}", this); 222 if (!hasMoreState()) { 223 LOG.warn("Ignore abort request on {} because it has already been finished", this); 224 return false; 225 } 226 if (!isRollbackSupported(getCurrentState())) { 227 LOG.warn("Ignore abort request on {} because it does not support rollback", this); 228 return false; 229 } 230 aborted.set(true); 231 return true; 232 } 233 234 /** 235 * If procedure has more states then abort it otherwise procedure is finished and abort can be 236 * ignored. 237 */ 238 protected final void failIfAborted() { 239 if (aborted.get()) { 240 if (hasMoreState()) { 241 setAbortFailure(getClass().getSimpleName(), "abort requested"); 242 } else { 243 LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this); 244 } 245 } 246 } 247 248 /** 249 * Used by the default implementation of abort() to know if the current state can be aborted 250 * and rollback can be triggered. 251 */ 252 protected boolean isRollbackSupported(final TState state) { 253 return false; 254 } 255 256 @Override 257 protected boolean isYieldAfterExecutionStep(final TEnvironment env) { 258 return isYieldBeforeExecuteFromState(env, getCurrentState()); 259 } 260 261 private boolean hasMoreState() { 262 return stateFlow != Flow.NO_MORE_STATE; 263 } 264 265 protected TState getCurrentState() { 266 return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState(); 267 } 268 269 /** 270 * This method is used from test code as it cannot be assumed that state transition will happen 271 * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in 272 * future. 273 */ 274 @VisibleForTesting 275 public int getCurrentStateId() { 276 return getStateId(getCurrentState()); 277 } 278 279 /** 280 * Set the next state for the procedure. 281 * @param stateId the ordinal() of the state enum (or state id) 282 */ 283 private void setNextState(final int stateId) { 284 if (states == null || states.length == stateCount) { 285 int newCapacity = stateCount + 8; 286 if (states != null) { 287 states = Arrays.copyOf(states, newCapacity); 288 } else { 289 states = new int[newCapacity]; 290 } 291 } 292 states[stateCount++] = stateId; 293 } 294 295 @Override 296 protected void toStringState(StringBuilder builder) { 297 super.toStringState(builder); 298 if (!isFinished() && !isEofState() && getCurrentState() != null) { 299 builder.append(":").append(getCurrentState()); 300 } 301 } 302 303 @Override 304 protected void serializeStateData(ProcedureStateSerializer serializer) 305 throws IOException { 306 StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder(); 307 for (int i = 0; i < stateCount; ++i) { 308 data.addState(states[i]); 309 } 310 serializer.serialize(data.build()); 311 } 312 313 @Override 314 protected void deserializeStateData(ProcedureStateSerializer serializer) 315 throws IOException { 316 StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class); 317 stateCount = data.getStateCount(); 318 if (stateCount > 0) { 319 states = new int[stateCount]; 320 for (int i = 0; i < stateCount; ++i) { 321 states[i] = data.getState(i); 322 } 323 if (isEofState()) { 324 stateFlow = Flow.NO_MORE_STATE; 325 } 326 } else { 327 states = null; 328 } 329 } 330}