001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.yetus.audience.InterfaceStability; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMachineProcedureData; 034 035/** 036 * Procedure described by a series of steps. 037 * 038 * <p>The procedure implementor must have an enum of 'states', describing 039 * the various step of the procedure. 040 * Once the procedure is running, the procedure-framework will call executeFromState() 041 * using the 'state' provided by the user. The first call to executeFromState() 042 * will be performed with 'state = null'. The implementor can jump between 043 * states using setNextState(MyStateEnum.ordinal()). 044 * The rollback will call rollbackState() for each state that was executed, in reverse order. 045 */ 046@InterfaceAudience.Private 047@InterfaceStability.Evolving 048public abstract class StateMachineProcedure<TEnvironment, TState> 049 extends Procedure<TEnvironment> { 050 private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcedure.class); 051 052 private static final int EOF_STATE = Integer.MIN_VALUE; 053 054 private final AtomicBoolean aborted = new AtomicBoolean(false); 055 056 private Flow stateFlow = Flow.HAS_MORE_STATE; 057 private int stateCount = 0; 058 private int[] states = null; 059 060 private List<Procedure<TEnvironment>> subProcList = null; 061 062 protected final int getCycles() { 063 return cycles; 064 } 065 066 /** 067 * Cycles on same state. Good for figuring if we are stuck. 068 */ 069 private int cycles = 0; 070 071 /** 072 * Ordinal of the previous state. So we can tell if we are progressing or not. 073 */ 074 private int previousState; 075 076 @VisibleForTesting 077 public enum Flow { 078 HAS_MORE_STATE, 079 NO_MORE_STATE, 080 } 081 082 /** 083 * called to perform a single step of the specified 'state' of the procedure 084 * @param state state to execute 085 * @return Flow.NO_MORE_STATE if the procedure is completed, 086 * Flow.HAS_MORE_STATE if there is another step. 087 */ 088 protected abstract Flow executeFromState(TEnvironment env, TState state) 089 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; 090 091 /** 092 * called to perform the rollback of the specified state 093 * @param state state to rollback 094 * @throws IOException temporary failure, the rollback will retry later 095 */ 096 protected abstract void rollbackState(TEnvironment env, TState state) 097 throws IOException, InterruptedException; 098 099 /** 100 * Convert an ordinal (or state id) to an Enum (or more descriptive) state object. 101 * @param stateId the ordinal() of the state enum (or state id) 102 * @return the state enum object 103 */ 104 protected abstract TState getState(int stateId); 105 106 /** 107 * Convert the Enum (or more descriptive) state object to an ordinal (or state id). 108 * @param state the state enum object 109 * @return stateId the ordinal() of the state enum (or state id) 110 */ 111 protected abstract int getStateId(TState state); 112 113 /** 114 * Return the initial state object that will be used for the first call to executeFromState(). 115 * @return the initial state enum object 116 */ 117 protected abstract TState getInitialState(); 118 119 /** 120 * Set the next state for the procedure. 121 * @param state the state enum object 122 */ 123 protected void setNextState(final TState state) { 124 setNextState(getStateId(state)); 125 failIfAborted(); 126 } 127 128 /** 129 * By default, the executor will try ro run all the steps of the procedure start to finish. 130 * Return true to make the executor yield between execution steps to 131 * give other procedures time to run their steps. 132 * @param state the state we are going to execute next. 133 * @return Return true if the executor should yield before the execution of the specified step. 134 * Defaults to return false. 135 */ 136 protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) { 137 return false; 138 } 139 140 /** 141 * Add a child procedure to execute 142 * @param subProcedure the child procedure 143 */ 144 protected <T extends Procedure<TEnvironment>> void addChildProcedure( 145 @SuppressWarnings("unchecked") T... subProcedure) { 146 if (subProcedure == null) { 147 return; 148 } 149 final int len = subProcedure.length; 150 if (len == 0) { 151 return; 152 } 153 if (subProcList == null) { 154 subProcList = new ArrayList<>(len); 155 } 156 for (int i = 0; i < len; ++i) { 157 Procedure<TEnvironment> proc = subProcedure[i]; 158 if (!proc.hasOwner()) { 159 proc.setOwner(getOwner()); 160 } 161 162 subProcList.add(proc); 163 } 164 } 165 166 @Override 167 protected Procedure[] execute(final TEnvironment env) 168 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 169 updateTimestamp(); 170 try { 171 failIfAborted(); 172 173 if (!hasMoreState() || isFailed()) { 174 return null; 175 } 176 177 TState state = getCurrentState(); 178 if (stateCount == 0) { 179 setNextState(getStateId(state)); 180 } 181 182 if (LOG.isTraceEnabled()) { 183 LOG.trace(state + " " + this + "; cycles=" + this.cycles); 184 } 185 // Keep running count of cycles 186 if (getStateId(state) != this.previousState) { 187 this.previousState = getStateId(state); 188 this.cycles = 0; 189 } else { 190 this.cycles++; 191 } 192 193 LOG.trace("{}", this); 194 stateFlow = executeFromState(env, state); 195 if (!hasMoreState()) { 196 setNextState(EOF_STATE); 197 } 198 199 if (subProcList != null && !subProcList.isEmpty()) { 200 Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); 201 subProcList = null; 202 return subProcedures; 203 } 204 return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; 205 } finally { 206 updateTimestamp(); 207 } 208 } 209 210 @Override 211 protected void rollback(final TEnvironment env) 212 throws IOException, InterruptedException { 213 if (isEofState()) { 214 stateCount--; 215 } 216 217 try { 218 updateTimestamp(); 219 rollbackState(env, getCurrentState()); 220 stateCount--; 221 } finally { 222 updateTimestamp(); 223 } 224 } 225 226 private boolean isEofState() { 227 return stateCount > 0 && states[stateCount-1] == EOF_STATE; 228 } 229 230 @Override 231 protected boolean abort(final TEnvironment env) { 232 LOG.debug("Abort requested for {}", this); 233 if (!hasMoreState()) { 234 LOG.warn("Ignore abort request on {} because it has already been finished", this); 235 return false; 236 } 237 if (!isRollbackSupported(getCurrentState())) { 238 LOG.warn("Ignore abort request on {} because it does not support rollback", this); 239 return false; 240 } 241 aborted.set(true); 242 return true; 243 } 244 245 /** 246 * If procedure has more states then abort it otherwise procedure is finished and abort can be 247 * ignored. 248 */ 249 protected final void failIfAborted() { 250 if (aborted.get()) { 251 if (hasMoreState()) { 252 setAbortFailure(getClass().getSimpleName(), "abort requested"); 253 } else { 254 LOG.warn("Ignoring abort request on state='" + getCurrentState() + "' for " + this); 255 } 256 } 257 } 258 259 /** 260 * Used by the default implementation of abort() to know if the current state can be aborted 261 * and rollback can be triggered. 262 */ 263 protected boolean isRollbackSupported(final TState state) { 264 return false; 265 } 266 267 @Override 268 protected boolean isYieldAfterExecutionStep(final TEnvironment env) { 269 return isYieldBeforeExecuteFromState(env, getCurrentState()); 270 } 271 272 private boolean hasMoreState() { 273 return stateFlow != Flow.NO_MORE_STATE; 274 } 275 276 protected TState getCurrentState() { 277 return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState(); 278 } 279 280 /** 281 * This method is used from test code as it cannot be assumed that state transition will happen 282 * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in 283 * future. 284 */ 285 @VisibleForTesting 286 public int getCurrentStateId() { 287 return getStateId(getCurrentState()); 288 } 289 290 /** 291 * Set the next state for the procedure. 292 * @param stateId the ordinal() of the state enum (or state id) 293 */ 294 private void setNextState(final int stateId) { 295 if (states == null || states.length == stateCount) { 296 int newCapacity = stateCount + 8; 297 if (states != null) { 298 states = Arrays.copyOf(states, newCapacity); 299 } else { 300 states = new int[newCapacity]; 301 } 302 } 303 states[stateCount++] = stateId; 304 } 305 306 @Override 307 protected void toStringState(StringBuilder builder) { 308 super.toStringState(builder); 309 if (!isFinished() && !isEofState() && getCurrentState() != null) { 310 builder.append(":").append(getCurrentState()); 311 } 312 } 313 314 @Override 315 protected void serializeStateData(ProcedureStateSerializer serializer) 316 throws IOException { 317 StateMachineProcedureData.Builder data = StateMachineProcedureData.newBuilder(); 318 for (int i = 0; i < stateCount; ++i) { 319 data.addState(states[i]); 320 } 321 serializer.serialize(data.build()); 322 } 323 324 @Override 325 protected void deserializeStateData(ProcedureStateSerializer serializer) 326 throws IOException { 327 StateMachineProcedureData data = serializer.deserialize(StateMachineProcedureData.class); 328 stateCount = data.getStateCount(); 329 if (stateCount > 0) { 330 states = new int[stateCount]; 331 for (int i = 0; i < stateCount; ++i) { 332 states[i] = data.getState(i); 333 } 334 if (isEofState()) { 335 stateFlow = Flow.NO_MORE_STATE; 336 } 337 } else { 338 states = null; 339 } 340 } 341}