001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.stream.Collectors;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
030import org.apache.hadoop.hbase.testclassification.MasterTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.junit.AfterClass;
033import org.junit.BeforeClass;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
041
042@Category({ MasterTests.class, SmallTests.class })
043public class TestProcedureBypass {
044
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047    HBaseClassTestRule.forClass(TestProcedureBypass.class);
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class);
050
051  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
052
053  private static TestProcEnv procEnv;
054  private static ProcedureStore procStore;
055
056  private static ProcedureExecutor<TestProcEnv> procExecutor;
057
058  private static HBaseCommonTestingUtility htu;
059
060  private static FileSystem fs;
061  private static Path testDir;
062  private static Path logDir;
063
064  private static class TestProcEnv {
065  }
066
067  @BeforeClass
068  public static void setUp() throws Exception {
069    htu = new HBaseCommonTestingUtility();
070
071    // NOTE: The executor will be created by each test
072    procEnv = new TestProcEnv();
073    testDir = htu.getDataTestDir();
074    fs = testDir.getFileSystem(htu.getConfiguration());
075    assertTrue(testDir.depth() > 1);
076
077    logDir = new Path(testDir, "proc-logs");
078    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
079    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
080    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
081    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
082  }
083
084  @Test
085  public void testBypassSuspendProcedure() throws Exception {
086    final SuspendProcedure proc = new SuspendProcedure();
087    long id = procExecutor.submitProcedure(proc);
088    Thread.sleep(500);
089    // bypass the procedure
090    assertTrue(procExecutor.bypassProcedure(id, 30000, false, false));
091    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
092    LOG.info("{} finished", proc);
093  }
094
095  @Test
096  public void testStuckProcedure() throws Exception {
097    final StuckProcedure proc = new StuckProcedure();
098    long id = procExecutor.submitProcedure(proc);
099    Thread.sleep(500);
100    // bypass the procedure
101    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
102    // Since the procedure is stuck there, we need to restart the executor to recovery.
103    ProcedureTestingUtility.restart(procExecutor);
104    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
105    LOG.info("{} finished", proc);
106  }
107
108  @Test
109  public void testBypassingProcedureWithParent() throws Exception {
110    final RootProcedure proc = new RootProcedure();
111    long rootId = procExecutor.submitProcedure(proc);
112    htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
113      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).size() > 0);
114    SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().stream()
115      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
116    assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false));
117    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
118    LOG.info("{} finished", proc);
119  }
120
121  @Test
122  public void testBypassingStuckStateMachineProcedure() throws Exception {
123    final StuckStateMachineProcedure proc =
124      new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START);
125    long id = procExecutor.submitProcedure(proc);
126    Thread.sleep(500);
127    // bypass the procedure
128    assertFalse(procExecutor.bypassProcedure(id, 1000, false, false));
129    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
130
131    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
132    LOG.info("{} finished", proc);
133  }
134
135  @Test
136  public void testBypassingProcedureWithParentRecursive() throws Exception {
137    final RootProcedure proc = new RootProcedure();
138    long rootId = procExecutor.submitProcedure(proc);
139    htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
140      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).size() > 0);
141    SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().stream()
142      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
143    assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true));
144    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
145    LOG.info("{} finished", proc);
146  }
147
148  @Test
149  public void testBypassingWaitingTimeoutProcedures() throws Exception {
150    final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure();
151    long id = procExecutor.submitProcedure(proc);
152    Thread.sleep(500);
153    // bypass the procedure
154    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
155
156    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
157    LOG.info("{} finished", proc);
158  }
159
160  @AfterClass
161  public static void tearDown() throws Exception {
162    procExecutor.stop();
163    procStore.stop(false);
164    procExecutor.join();
165  }
166
167  public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
168
169    public SuspendProcedure() {
170      super();
171    }
172
173    @Override
174    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
175      // Always suspend the procedure
176      throw new ProcedureSuspendedException();
177    }
178  }
179
180  public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
181
182    public StuckProcedure() {
183      super();
184    }
185
186    @Override
187    protected Procedure[] execute(final TestProcEnv env) {
188      try {
189        Thread.sleep(Long.MAX_VALUE);
190      } catch (Throwable t) {
191        LOG.debug("Sleep is interrupted.", t);
192      }
193      return null;
194    }
195
196  }
197
198  public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
199    private boolean childSpwaned = false;
200
201    public RootProcedure() {
202      super();
203    }
204
205    @Override
206    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
207      if (!childSpwaned) {
208        childSpwaned = true;
209        return new Procedure[] { new SuspendProcedure() };
210      } else {
211        return null;
212      }
213    }
214  }
215
216  public static class WaitingTimeoutProcedure
217    extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
218    public WaitingTimeoutProcedure() {
219      super();
220    }
221
222    @Override
223    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
224      // Always suspend the procedure
225      setTimeout(50000);
226      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
227      skipPersistence();
228      throw new ProcedureSuspendedException();
229    }
230
231    @Override
232    protected synchronized boolean setTimeoutFailure(TestProcEnv env) {
233      setState(ProcedureProtos.ProcedureState.RUNNABLE);
234      procExecutor.getScheduler().addFront(this);
235      return false; // 'false' means that this procedure handled the timeout
236    }
237  }
238
239  public enum StuckStateMachineState {
240    START,
241    THEN,
242    END
243  }
244
245  public static class StuckStateMachineProcedure
246    extends ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> {
247    private AtomicBoolean stop = new AtomicBoolean(false);
248
249    public StuckStateMachineProcedure() {
250      super();
251    }
252
253    public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) {
254      super(env, initialState);
255    }
256
257    @Override
258    protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState)
259      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
260      switch (tState) {
261        case START:
262          LOG.info("PHASE 1: START");
263          setNextState(StuckStateMachineState.THEN);
264          return Flow.HAS_MORE_STATE;
265        case THEN:
266          if (stop.get()) {
267            setNextState(StuckStateMachineState.END);
268          }
269          return Flow.HAS_MORE_STATE;
270        case END:
271          return Flow.NO_MORE_STATE;
272        default:
273          throw new UnsupportedOperationException("unhandled state=" + tState);
274      }
275    }
276
277    @Override
278    protected StuckStateMachineState getState(int stateId) {
279      return StuckStateMachineState.values()[stateId];
280    }
281
282    @Override
283    protected int getStateId(StuckStateMachineState tState) {
284      return tState.ordinal();
285    }
286  }
287
288}