001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertTrue; 022 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.stream.Collectors; 025 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 031import org.apache.hadoop.hbase.testclassification.MasterTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.junit.AfterClass; 034import org.junit.BeforeClass; 035import org.junit.ClassRule; 036import org.junit.Test; 037import org.junit.experimental.categories.Category; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 042 043 044@Category({MasterTests.class, SmallTests.class}) 045public class TestProcedureBypass { 046 047 @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule 048 .forClass(TestProcedureBypass.class); 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class); 051 052 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 053 054 private static TestProcEnv procEnv; 055 private static ProcedureStore procStore; 056 057 private static ProcedureExecutor<TestProcEnv> procExecutor; 058 059 private static HBaseCommonTestingUtility htu; 060 061 private static FileSystem fs; 062 private static Path testDir; 063 private static Path logDir; 064 065 private static class TestProcEnv { 066 } 067 068 @BeforeClass 069 public static void setUp() throws Exception { 070 htu = new HBaseCommonTestingUtility(); 071 072 // NOTE: The executor will be created by each test 073 procEnv = new TestProcEnv(); 074 testDir = htu.getDataTestDir(); 075 fs = testDir.getFileSystem(htu.getConfiguration()); 076 assertTrue(testDir.depth() > 1); 077 078 logDir = new Path(testDir, "proc-logs"); 079 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 080 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, 081 procStore); 082 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 083 ProcedureTestingUtility 084 .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true); 085 } 086 087 @Test 088 public void testBypassSuspendProcedure() throws Exception { 089 final SuspendProcedure proc = new SuspendProcedure(); 090 long id = procExecutor.submitProcedure(proc); 091 Thread.sleep(500); 092 //bypass the procedure 093 assertTrue(procExecutor.bypassProcedure(id, 30000, false, false)); 094 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 095 LOG.info("{} finished", proc); 096 } 097 098 @Test 099 public void testStuckProcedure() throws Exception { 100 final StuckProcedure proc = new StuckProcedure(); 101 long id = procExecutor.submitProcedure(proc); 102 Thread.sleep(500); 103 //bypass the procedure 104 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 105 //Since the procedure is stuck there, we need to restart the executor to recovery. 106 ProcedureTestingUtility.restart(procExecutor); 107 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 108 LOG.info("{} finished", proc); 109 } 110 111 @Test 112 public void testBypassingProcedureWithParent() throws Exception { 113 final RootProcedure proc = new RootProcedure(); 114 long rootId = procExecutor.submitProcedure(proc); 115 htu.waitFor(5000, () -> procExecutor.getProcedures().stream() 116 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()) 117 .size() > 0); 118 SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream() 119 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); 120 assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false)); 121 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 122 LOG.info("{} finished", proc); 123 } 124 125 @Test 126 public void testBypassingProcedureWithParentRecursive() throws Exception { 127 final RootProcedure proc = new RootProcedure(); 128 long rootId = procExecutor.submitProcedure(proc); 129 htu.waitFor(5000, () -> procExecutor.getProcedures().stream() 130 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()) 131 .size() > 0); 132 SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream() 133 .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0); 134 assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true)); 135 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 136 LOG.info("{} finished", proc); 137 } 138 139 @Test 140 public void testBypassingStuckStateMachineProcedure() throws Exception { 141 final StuckStateMachineProcedure proc = 142 new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START); 143 long id = procExecutor.submitProcedure(proc); 144 Thread.sleep(500); 145 // bypass the procedure 146 assertFalse(procExecutor.bypassProcedure(id, 1000, false, false)); 147 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 148 149 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 150 LOG.info("{} finished", proc); 151 } 152 153 @Test 154 public void testBypassingWaitingTimeoutProcedures() throws Exception { 155 final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure(); 156 long id = procExecutor.submitProcedure(proc); 157 Thread.sleep(500); 158 // bypass the procedure 159 assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); 160 161 htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); 162 LOG.info("{} finished", proc); 163 } 164 165 @AfterClass 166 public static void tearDown() throws Exception { 167 procExecutor.stop(); 168 procStore.stop(false); 169 procExecutor.join(); 170 } 171 172 public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 173 174 public SuspendProcedure() { 175 super(); 176 } 177 178 @Override 179 protected Procedure[] execute(final TestProcEnv env) 180 throws ProcedureSuspendedException { 181 // Always suspend the procedure 182 throw new ProcedureSuspendedException(); 183 } 184 } 185 186 public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 187 188 public StuckProcedure() { 189 super(); 190 } 191 192 @Override 193 protected Procedure[] execute(final TestProcEnv env) { 194 try { 195 Thread.sleep(Long.MAX_VALUE); 196 } catch (Throwable t) { 197 LOG.debug("Sleep is interrupted.", t); 198 } 199 return null; 200 } 201 202 } 203 204 public static class WaitingTimeoutProcedure 205 extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 206 public WaitingTimeoutProcedure() { 207 super(); 208 } 209 210 @Override 211 protected Procedure[] execute(final TestProcEnv env) 212 throws ProcedureSuspendedException { 213 // Always suspend the procedure 214 setTimeout(50000); 215 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 216 skipPersistence(); 217 throw new ProcedureSuspendedException(); 218 } 219 220 @Override 221 protected synchronized boolean setTimeoutFailure(TestProcEnv env) { 222 setState(ProcedureProtos.ProcedureState.RUNNABLE); 223 procExecutor.getScheduler().addFront(this); 224 return false; // 'false' means that this procedure handled the timeout 225 } 226 } 227 228 public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { 229 private boolean childSpwaned = false; 230 231 public RootProcedure() { 232 super(); 233 } 234 235 @Override 236 protected Procedure[] execute(final TestProcEnv env) 237 throws ProcedureSuspendedException { 238 if (!childSpwaned) { 239 childSpwaned = true; 240 return new Procedure[] {new SuspendProcedure()}; 241 } else { 242 return null; 243 } 244 } 245 } 246 247 public enum StuckStateMachineState { 248 START, THEN, END 249 } 250 251 public static class StuckStateMachineProcedure extends 252 ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> { 253 private AtomicBoolean stop = new AtomicBoolean(false); 254 255 public StuckStateMachineProcedure() { 256 super(); 257 } 258 259 public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) { 260 super(env, initialState); 261 } 262 263 @Override 264 protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState) 265 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 266 switch (tState) { 267 case START: 268 LOG.info("PHASE 1: START"); 269 setNextState(StuckStateMachineState.THEN); 270 return Flow.HAS_MORE_STATE; 271 case THEN: 272 if (stop.get()) { 273 setNextState(StuckStateMachineState.END); 274 } 275 return Flow.HAS_MORE_STATE; 276 case END: 277 return Flow.NO_MORE_STATE; 278 default: 279 throw new UnsupportedOperationException("unhandled state=" + tState); 280 } 281 } 282 283 @Override 284 protected StuckStateMachineState getState(int stateId) { 285 return StuckStateMachineState.values()[stateId]; 286 } 287 288 @Override 289 protected int getStateId(StuckStateMachineState tState) { 290 return tState.ordinal(); 291 } 292 } 293}