001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.stream.Collectors; 025 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 031import org.apache.hadoop.hbase.testclassification.MasterTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.junit.AfterClass; 034import org.junit.BeforeClass; 035import org.junit.ClassRule; 036import org.junit.Test; 037import org.junit.experimental.categories.Category; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 042 043 044@Category({MasterTests.class, SmallTests.class}) 045public class TestProcedureBypass { 046 047 @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule 048 .forClass(TestProcedureBypass.class); 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class); 051 052 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 053 054 private static TestProcEnv procEnv; 055 private static ProcedureStore procStore; 056 057 private static ProcedureExecutor<TestProcEnv> procExecutor; 058 059 private static HBaseCommonTestingUtility htu; 060 061 private static FileSystem fs; 062 private static Path testDir; 063 private static Path logDir; 064 065 private static class TestProcEnv { 066 } 067 068 @BeforeClass 069 public static void setUp() throws Exception { 070 htu = new HBaseCommonTestingUtility(); 071 072 // NOTE: The executor will be created by each test 073 procEnv = new TestProcEnv(); 074 testDir = htu.getDataTestDir(); 075 fs = testDir.getFileSystem(htu.getConfiguration()); 076 assertTrue(testDir.depth() > 1); 077 078 logDir = new Path(testDir, "proc-logs"); 079 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 080 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, 081 procStore); 082 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 083 ProcedureTestingUtility 084 .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 085 } 086 087 @Test 088 public void testBypassSuspendProcedure() throws Exception { 089 final SuspendProcedure proc = new SuspendProcedure(); 090 long id = procExecutor.submitProcedure(proc); 091 Thread.sleep(500); 092 //bypass the procedure 093 assertTrue(procExecutor.bypassProcedure(id, 30000, false, false)); 094 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 095 LOG.info("{} finished", proc); 096 } 097 098 @Test 099 public void testStuckProcedure() throws Exception { 100 final StuckProcedure proc = new StuckProcedure(); 101 long id = procExecutor.submitProcedure(proc); 102 Thread.sleep(500); 103 //bypass the procedure 104 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 105 //Since the procedure is stuck there, we need to restart the executor to recovery. 106 ProcedureTestingUtility.restart(procExecutor); 107 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 108 LOG.info("{} finished", proc); 109 } 110 111 @Test 112 public void testBypassingProcedureWithParent() throws Exception { 113 final RootProcedure proc = new RootProcedure(); 114 long rootId = procExecutor.submitProcedure(proc); 115 htu.waitFor(5000, () -> procExecutor.getProcedures().stream() 116 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()) 117 .size() > 0); 118 SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream() 119 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); 120 assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false)); 121 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 122 LOG.info("{} finished", proc); 123 } 124 125 @Test 126 public void testBypassingStuckStateMachineProcedure() throws Exception { 127 final StuckStateMachineProcedure proc = 128 new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START); 129 long id = procExecutor.submitProcedure(proc); 130 Thread.sleep(500); 131 // bypass the procedure 132 assertFalse(procExecutor.bypassProcedure(id, 1000, false, false)); 133 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 134 135 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 136 LOG.info("{} finished", proc); 137 } 138 139 @Test 140 public void testBypassingProcedureWithParentRecursive() throws Exception { 141 final RootProcedure proc = new RootProcedure(); 142 long rootId = procExecutor.submitProcedure(proc); 143 htu.waitFor(5000, () -> procExecutor.getProcedures().stream() 144 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()) 145 .size() > 0); 146 SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream() 147 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); 148 assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true)); 149 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 150 LOG.info("{} finished", proc); 151 } 152 153 @Test 154 public void testBypassingWaitingTimeoutProcedures() throws Exception { 155 final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure(); 156 long id = procExecutor.submitProcedure(proc); 157 Thread.sleep(500); 158 // bypass the procedure 159 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 160 161 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 162 LOG.info("{} finished", proc); 163 } 164 165 @AfterClass 166 public static void tearDown() throws Exception { 167 procExecutor.stop(); 168 procStore.stop(false); 169 procExecutor.join(); 170 } 171 172 public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 173 174 public SuspendProcedure() { 175 super(); 176 } 177 178 @Override 179 protected Procedure[] execute(final TestProcEnv env) 180 throws ProcedureSuspendedException { 181 // Always suspend the procedure 182 throw new ProcedureSuspendedException(); 183 } 184 } 185 186 public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 187 188 public StuckProcedure() { 189 super(); 190 } 191 192 @Override 193 protected Procedure[] execute(final TestProcEnv env) { 194 try { 195 Thread.sleep(Long.MAX_VALUE); 196 } catch (Throwable t) { 197 LOG.debug("Sleep is interrupted.", t); 198 } 199 return null; 200 } 201 202 } 203 204 205 public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 206 private boolean childSpwaned = false; 207 208 public RootProcedure() { 209 super(); 210 } 211 212 @Override 213 protected Procedure[] execute(final TestProcEnv env) 214 throws ProcedureSuspendedException { 215 if (!childSpwaned) { 216 childSpwaned = true; 217 return new Procedure[] {new SuspendProcedure()}; 218 } else { 219 return null; 220 } 221 } 222 } 223 224 public static class WaitingTimeoutProcedure 225 extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 226 public WaitingTimeoutProcedure() { 227 super(); 228 } 229 230 @Override 231 protected Procedure[] execute(final TestProcEnv env) 232 throws ProcedureSuspendedException { 233 // Always suspend the procedure 234 setTimeout(50000); 235 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 236 skipPersistence(); 237 throw new ProcedureSuspendedException(); 238 } 239 240 @Override 241 protected synchronized boolean setTimeoutFailure(TestProcEnv env) { 242 setState(ProcedureProtos.ProcedureState.RUNNABLE); 243 procExecutor.getScheduler().addFront(this); 244 return false; // 'false' means that this procedure handled the timeout 245 } 246 } 247 248 public enum StuckStateMachineState { 249 START, THEN, END 250 } 251 252 public static class StuckStateMachineProcedure extends 253 ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> { 254 private AtomicBoolean stop = new AtomicBoolean(false); 255 256 public StuckStateMachineProcedure() { 257 super(); 258 } 259 260 public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) { 261 super(env, initialState); 262 } 263 264 @Override 265 protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState) 266 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 267 switch (tState) { 268 case START: 269 LOG.info("PHASE 1: START"); 270 setNextState(StuckStateMachineState.THEN); 271 return Flow.HAS_MORE_STATE; 272 case THEN: 273 if (stop.get()) { 274 setNextState(StuckStateMachineState.END); 275 } 276 return Flow.HAS_MORE_STATE; 277 case END: 278 return Flow.NO_MORE_STATE; 279 default: 280 throw new UnsupportedOperationException("unhandled state=" + tState); 281 } 282 } 283 284 @Override 285 protected StuckStateMachineState getState(int stateId) { 286 return StuckStateMachineState.values()[stateId]; 287 } 288 289 @Override 290 protected int getStateId(StuckStateMachineState tState) { 291 return tState.ordinal(); 292 } 293 } 294 295 296}