001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.stream.Collectors; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 029import org.apache.hadoop.hbase.testclassification.MasterTests; 030import org.apache.hadoop.hbase.testclassification.SmallTests; 031import org.junit.jupiter.api.AfterAll; 032import org.junit.jupiter.api.BeforeAll; 033import org.junit.jupiter.api.Tag; 034import org.junit.jupiter.api.Test; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 039 040@Tag(MasterTests.TAG) 041@Tag(SmallTests.TAG) 042public class TestProcedureBypass { 043 044 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class); 045 046 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 047 048 private static TestProcEnv procEnv; 049 private static ProcedureStore procStore; 050 051 private static ProcedureExecutor<TestProcEnv> procExecutor; 052 053 private static HBaseCommonTestingUtil htu; 054 055 private static FileSystem fs; 056 private static Path testDir; 057 private static Path logDir; 058 059 private static class TestProcEnv { 060 } 061 062 @BeforeAll 063 public static void setUp() throws Exception { 064 htu = new HBaseCommonTestingUtil(); 065 066 // NOTE: The executor will be created by each test 067 procEnv = new TestProcEnv(); 068 testDir = htu.getDataTestDir(); 069 fs = testDir.getFileSystem(htu.getConfiguration()); 070 assertTrue(testDir.depth() > 1); 071 072 logDir = new Path(testDir, "proc-logs"); 073 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 074 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 075 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 076 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 077 } 078 079 @AfterAll 080 public static void tearDown() throws Exception { 081 procExecutor.stop(); 082 procStore.stop(false); 083 procExecutor.join(); 084 } 085 086 @Test 087 public void testBypassSuspendProcedure() throws Exception { 088 final SuspendProcedure proc = new SuspendProcedure(); 089 long id = procExecutor.submitProcedure(proc); 090 Thread.sleep(500); 091 // bypass the procedure 092 assertTrue(procExecutor.bypassProcedure(id, 30000, false, false)); 093 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 094 LOG.info("{} finished", proc); 095 } 096 097 @Test 098 public void testStuckProcedure() throws Exception { 099 final StuckProcedure proc = new StuckProcedure(); 100 long id = procExecutor.submitProcedure(proc); 101 Thread.sleep(500); 102 // bypass the procedure 103 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 104 // Since the procedure is stuck there, we need to restart the executor to recovery. 105 ProcedureTestingUtility.restart(procExecutor); 106 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 107 LOG.info("{} finished", proc); 108 } 109 110 @Test 111 public void testBypassingProcedureWithParent() throws Exception { 112 final RootProcedure proc = new RootProcedure(); 113 long rootId = procExecutor.submitProcedure(proc); 114 htu.waitFor(5000, () -> procExecutor.getProcedures().stream() 115 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).size() > 0); 116 SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().stream() 117 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); 118 assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false)); 119 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 120 LOG.info("{} finished", proc); 121 } 122 123 @Test 124 public void testBypassingStuckStateMachineProcedure() throws Exception { 125 final StuckStateMachineProcedure proc = 126 new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START); 127 long id = procExecutor.submitProcedure(proc); 128 Thread.sleep(500); 129 // bypass the procedure 130 assertFalse(procExecutor.bypassProcedure(id, 1000, false, false)); 131 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 132 133 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 134 LOG.info("{} finished", proc); 135 } 136 137 @Test 138 public void testBypassingProcedureWithParentRecursive() throws Exception { 139 final RootProcedure proc = new RootProcedure(); 140 long rootId = procExecutor.submitProcedure(proc); 141 htu.waitFor(5000, () -> procExecutor.getProcedures().stream() 142 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).size() > 0); 143 SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().stream() 144 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); 145 assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true)); 146 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 147 LOG.info("{} finished", proc); 148 } 149 150 @Test 151 public void testBypassingWaitingTimeoutProcedures() throws Exception { 152 final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure(); 153 long id = procExecutor.submitProcedure(proc); 154 Thread.sleep(500); 155 // bypass the procedure 156 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 157 158 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 159 LOG.info("{} finished", proc); 160 } 161 162 public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 163 164 public SuspendProcedure() { 165 super(); 166 } 167 168 @Override 169 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 170 // Always suspend the procedure 171 throw new ProcedureSuspendedException(); 172 } 173 } 174 175 public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 176 177 public StuckProcedure() { 178 super(); 179 } 180 181 @Override 182 protected Procedure[] execute(final TestProcEnv env) { 183 try { 184 Thread.sleep(Long.MAX_VALUE); 185 } catch (Throwable t) { 186 LOG.debug("Sleep is interrupted.", t); 187 } 188 return null; 189 } 190 191 } 192 193 public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 194 private boolean childSpwaned = false; 195 196 public RootProcedure() { 197 super(); 198 } 199 200 @Override 201 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 202 if (!childSpwaned) { 203 childSpwaned = true; 204 return new Procedure[] { new SuspendProcedure() }; 205 } else { 206 return null; 207 } 208 } 209 } 210 211 public static class WaitingTimeoutProcedure 212 extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 213 public WaitingTimeoutProcedure() { 214 super(); 215 } 216 217 @Override 218 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 219 // Always suspend the procedure 220 setTimeout(50000); 221 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 222 skipPersistence(); 223 throw new ProcedureSuspendedException(); 224 } 225 226 @Override 227 protected synchronized boolean setTimeoutFailure(TestProcEnv env) { 228 setState(ProcedureProtos.ProcedureState.RUNNABLE); 229 procExecutor.getScheduler().addFront(this); 230 return false; // 'false' means that this procedure handled the timeout 231 } 232 } 233 234 public enum StuckStateMachineState { 235 START, 236 THEN, 237 END 238 } 239 240 public static class StuckStateMachineProcedure 241 extends ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> { 242 private AtomicBoolean stop = new AtomicBoolean(false); 243 244 public StuckStateMachineProcedure() { 245 super(); 246 } 247 248 public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) { 249 super(env, initialState); 250 } 251 252 @Override 253 protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState) 254 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 255 switch (tState) { 256 case START: 257 LOG.info("PHASE 1: START"); 258 setNextState(StuckStateMachineState.THEN); 259 return Flow.HAS_MORE_STATE; 260 case THEN: 261 if (stop.get()) { 262 setNextState(StuckStateMachineState.END); 263 } 264 return Flow.HAS_MORE_STATE; 265 case END: 266 return Flow.NO_MORE_STATE; 267 default: 268 throw new UnsupportedOperationException("unhandled state=" + tState); 269 } 270 } 271 272 @Override 273 protected StuckStateMachineState getState(int stateId) { 274 return StuckStateMachineState.values()[stateId]; 275 } 276 277 @Override 278 protected int getStateId(StuckStateMachineState tState) { 279 return tState.ordinal(); 280 } 281 } 282}