001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.stream.Collectors; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 030import org.apache.hadoop.hbase.testclassification.MasterTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.junit.AfterClass; 033import org.junit.BeforeClass; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 041 042@Category({ MasterTests.class, SmallTests.class }) 043public class TestProcedureBypass { 044 045 @ClassRule 046 public static final HBaseClassTestRule CLASS_RULE = 047 HBaseClassTestRule.forClass(TestProcedureBypass.class); 048 049 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class); 050 051 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 052 053 private static TestProcEnv procEnv; 054 private static ProcedureStore procStore; 055 056 private static ProcedureExecutor<TestProcEnv> procExecutor; 057 058 private static HBaseCommonTestingUtility htu; 059 060 private static FileSystem fs; 061 private static Path testDir; 062 private static Path logDir; 063 064 private static class TestProcEnv { 065 } 066 067 @BeforeClass 068 public static void setUp() throws Exception { 069 htu = new HBaseCommonTestingUtility(); 070 071 // NOTE: The executor will be created by each test 072 procEnv = new TestProcEnv(); 073 testDir = htu.getDataTestDir(); 074 fs = testDir.getFileSystem(htu.getConfiguration()); 075 assertTrue(testDir.depth() > 1); 076 077 logDir = new Path(testDir, "proc-logs"); 078 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 079 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 080 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 081 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 082 } 083 084 @Test 085 public void testBypassSuspendProcedure() throws Exception { 086 final SuspendProcedure proc = new SuspendProcedure(); 087 long id = procExecutor.submitProcedure(proc); 088 Thread.sleep(500); 089 // bypass the procedure 090 assertTrue(procExecutor.bypassProcedure(id, 30000, false, false)); 091 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 092 LOG.info("{} finished", proc); 093 } 094 095 @Test 096 public void testStuckProcedure() throws Exception { 097 final StuckProcedure proc = new StuckProcedure(); 098 long id = procExecutor.submitProcedure(proc); 099 Thread.sleep(500); 100 // bypass the procedure 101 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 102 // Since the procedure is stuck there, we need to restart the executor to recovery. 103 ProcedureTestingUtility.restart(procExecutor); 104 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 105 LOG.info("{} finished", proc); 106 } 107 108 @Test 109 public void testBypassingProcedureWithParent() throws Exception { 110 final RootProcedure proc = new RootProcedure(); 111 long rootId = procExecutor.submitProcedure(proc); 112 htu.waitFor(5000, () -> procExecutor.getProcedures().stream() 113 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).size() > 0); 114 SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().stream() 115 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); 116 assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false)); 117 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 118 LOG.info("{} finished", proc); 119 } 120 121 @Test 122 public void testBypassingStuckStateMachineProcedure() throws Exception { 123 final StuckStateMachineProcedure proc = 124 new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START); 125 long id = procExecutor.submitProcedure(proc); 126 Thread.sleep(500); 127 // bypass the procedure 128 assertFalse(procExecutor.bypassProcedure(id, 1000, false, false)); 129 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 130 131 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 132 LOG.info("{} finished", proc); 133 } 134 135 @Test 136 public void testBypassingProcedureWithParentRecursive() throws Exception { 137 final RootProcedure proc = new RootProcedure(); 138 long rootId = procExecutor.submitProcedure(proc); 139 htu.waitFor(5000, () -> procExecutor.getProcedures().stream() 140 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).size() > 0); 141 SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().stream() 142 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); 143 assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true)); 144 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 145 LOG.info("{} finished", proc); 146 } 147 148 @Test 149 public void testBypassingWaitingTimeoutProcedures() throws Exception { 150 final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure(); 151 long id = procExecutor.submitProcedure(proc); 152 Thread.sleep(500); 153 // bypass the procedure 154 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 155 156 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 157 LOG.info("{} finished", proc); 158 } 159 160 @AfterClass 161 public static void tearDown() throws Exception { 162 procExecutor.stop(); 163 procStore.stop(false); 164 procExecutor.join(); 165 } 166 167 public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 168 169 public SuspendProcedure() { 170 super(); 171 } 172 173 @Override 174 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 175 // Always suspend the procedure 176 throw new ProcedureSuspendedException(); 177 } 178 } 179 180 public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 181 182 public StuckProcedure() { 183 super(); 184 } 185 186 @Override 187 protected Procedure[] execute(final TestProcEnv env) { 188 try { 189 Thread.sleep(Long.MAX_VALUE); 190 } catch (Throwable t) { 191 LOG.debug("Sleep is interrupted.", t); 192 } 193 return null; 194 } 195 196 } 197 198 public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 199 private boolean childSpwaned = false; 200 201 public RootProcedure() { 202 super(); 203 } 204 205 @Override 206 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 207 if (!childSpwaned) { 208 childSpwaned = true; 209 return new Procedure[] { new SuspendProcedure() }; 210 } else { 211 return null; 212 } 213 } 214 } 215 216 public static class WaitingTimeoutProcedure 217 extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 218 public WaitingTimeoutProcedure() { 219 super(); 220 } 221 222 @Override 223 protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException { 224 // Always suspend the procedure 225 setTimeout(50000); 226 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 227 skipPersistence(); 228 throw new ProcedureSuspendedException(); 229 } 230 231 @Override 232 protected synchronized boolean setTimeoutFailure(TestProcEnv env) { 233 setState(ProcedureProtos.ProcedureState.RUNNABLE); 234 procExecutor.getScheduler().addFront(this); 235 return false; // 'false' means that this procedure handled the timeout 236 } 237 } 238 239 public enum StuckStateMachineState { 240 START, 241 THEN, 242 END 243 } 244 245 public static class StuckStateMachineProcedure 246 extends ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> { 247 private AtomicBoolean stop = new AtomicBoolean(false); 248 249 public StuckStateMachineProcedure() { 250 super(); 251 } 252 253 public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) { 254 super(env, initialState); 255 } 256 257 @Override 258 protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState) 259 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 260 switch (tState) { 261 case START: 262 LOG.info("PHASE 1: START"); 263 setNextState(StuckStateMachineState.THEN); 264 return Flow.HAS_MORE_STATE; 265 case THEN: 266 if (stop.get()) { 267 setNextState(StuckStateMachineState.END); 268 } 269 return Flow.HAS_MORE_STATE; 270 case END: 271 return Flow.NO_MORE_STATE; 272 default: 273 throw new UnsupportedOperationException("unhandled state=" + tState); 274 } 275 } 276 277 @Override 278 protected StuckStateMachineState getState(int stateId) { 279 return StuckStateMachineState.values()[stateId]; 280 } 281 282 @Override 283 protected int getStateId(StuckStateMachineState tState) { 284 return tState.ordinal(); 285 } 286 } 287 288}