001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.util.ArrayList; 021import java.util.HashSet; 022import java.util.List; 023import java.util.Set; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.apache.yetus.audience.InterfaceStability; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 030 031/** 032 * Internal state of the ProcedureExecutor that describes the state of a "Root Procedure". A "Root 033 * Procedure" is a Procedure without parent, each subprocedure will be added to the "Root Procedure" 034 * stack (or rollback-stack). RootProcedureState is used and managed only by the ProcedureExecutor. 035 * Long rootProcId = getRootProcedureId(proc); rollbackStack.get(rootProcId).acquire(proc) 036 * rollbackStack.get(rootProcId).release(proc) ... 037 */ 038@InterfaceAudience.Private 039@InterfaceStability.Evolving 040class RootProcedureState<TEnvironment> { 041 private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class); 042 043 private enum State { 044 RUNNING, // The Procedure is running or ready to run 045 FAILED, // The Procedure failed, waiting for the rollback executing 046 ROLLINGBACK, // The Procedure failed and the execution was rolledback 047 } 048 049 private Set<Procedure<TEnvironment>> subprocs = null; 050 private ArrayList<Procedure<TEnvironment>> subprocStack = null; 051 private State state = State.RUNNING; 052 private int running = 0; 053 054 public synchronized boolean isFailed() { 055 switch (state) { 056 case ROLLINGBACK: 057 case FAILED: 058 return true; 059 default: 060 break; 061 } 062 return false; 063 } 064 065 public synchronized boolean isRollingback() { 066 return state == State.ROLLINGBACK; 067 } 068 069 /** 070 * Called by the ProcedureExecutor to mark rollback execution 071 */ 072 protected synchronized boolean setRollback() { 073 if (running == 0 && state == State.FAILED) { 074 state = State.ROLLINGBACK; 075 return true; 076 } 077 return false; 078 } 079 080 /** 081 * Called by the ProcedureExecutor to mark rollback execution 082 */ 083 protected synchronized void unsetRollback() { 084 assert state == State.ROLLINGBACK; 085 state = State.FAILED; 086 } 087 088 protected synchronized long[] getSubprocedureIds() { 089 if (subprocs == null) { 090 return null; 091 } 092 return subprocs.stream().mapToLong(Procedure::getProcId).toArray(); 093 } 094 095 protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() { 096 return subprocStack; 097 } 098 099 protected synchronized RemoteProcedureException getException() { 100 if (subprocStack != null) { 101 for (Procedure<TEnvironment> proc : subprocStack) { 102 if (proc.hasException()) { 103 return proc.getException(); 104 } 105 } 106 } 107 return null; 108 } 109 110 /** 111 * Called by the ProcedureExecutor to mark the procedure step as running. 112 */ 113 protected synchronized boolean acquire(Procedure<TEnvironment> proc) { 114 if (state != State.RUNNING) { 115 return false; 116 } 117 118 running++; 119 return true; 120 } 121 122 /** 123 * Called by the ProcedureExecutor to mark the procedure step as finished. 124 */ 125 protected synchronized void release(Procedure<TEnvironment> proc) { 126 running--; 127 } 128 129 protected synchronized void abort() { 130 if (state == State.RUNNING) { 131 state = State.FAILED; 132 } 133 } 134 135 /** 136 * Called by the ProcedureExecutor after the procedure step is completed, to add the step to the 137 * rollback list (or procedure stack) 138 */ 139 protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) { 140 if (proc.isFailed()) { 141 state = State.FAILED; 142 } 143 if (subprocStack == null) { 144 subprocStack = new ArrayList<>(); 145 } 146 proc.addStackIndex(subprocStack.size()); 147 LOG.trace("Add procedure {} as the {}th rollback step", proc, subprocStack.size()); 148 subprocStack.add(proc); 149 } 150 151 protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) { 152 if (!proc.hasParent()) { 153 return; 154 } 155 if (subprocs == null) { 156 subprocs = new HashSet<>(); 157 } 158 subprocs.add(proc); 159 } 160 161 /** 162 * Called on store load by the ProcedureExecutor to load part of the stack. Each procedure has its 163 * own stack-positions. Which means we have to write to the store only the Procedure we executed, 164 * and nothing else. on load we recreate the full stack by aggregating each procedure 165 * stack-positions. 166 */ 167 protected synchronized void loadStack(Procedure<TEnvironment> proc) { 168 addSubProcedure(proc); 169 int[] stackIndexes = proc.getStackIndexes(); 170 if (stackIndexes != null) { 171 if (subprocStack == null) { 172 subprocStack = new ArrayList<>(); 173 } 174 int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size(); 175 if (diff > 0) { 176 subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]); 177 while (diff-- > 0) { 178 subprocStack.add(null); 179 } 180 } 181 for (int i = 0; i < stackIndexes.length; ++i) { 182 subprocStack.set(stackIndexes[i], proc); 183 } 184 } 185 if (proc.getState() == ProcedureState.ROLLEDBACK) { 186 state = State.ROLLINGBACK; 187 } else if (proc.isFailed()) { 188 state = State.FAILED; 189 } 190 } 191 192 /** 193 * Called on store load by the ProcedureExecutor to validate the procedure stack. 194 */ 195 protected synchronized boolean isValid() { 196 if (subprocStack != null) { 197 for (Procedure<TEnvironment> proc : subprocStack) { 198 if (proc == null) { 199 return false; 200 } 201 } 202 } 203 return true; 204 } 205}