001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.stream.Collectors;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
029import org.apache.hadoop.hbase.testclassification.MasterTests;
030import org.apache.hadoop.hbase.testclassification.SmallTests;
031import org.junit.jupiter.api.AfterAll;
032import org.junit.jupiter.api.BeforeAll;
033import org.junit.jupiter.api.Tag;
034import org.junit.jupiter.api.Test;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
039
040@Tag(MasterTests.TAG)
041@Tag(SmallTests.TAG)
042public class TestProcedureBypass {
043
044  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class);
045
046  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
047
048  private static TestProcEnv procEnv;
049  private static ProcedureStore procStore;
050
051  private static ProcedureExecutor<TestProcEnv> procExecutor;
052
053  private static HBaseCommonTestingUtil htu;
054
055  private static FileSystem fs;
056  private static Path testDir;
057  private static Path logDir;
058
059  private static class TestProcEnv {
060  }
061
062  @BeforeAll
063  public static void setUp() throws Exception {
064    htu = new HBaseCommonTestingUtil();
065
066    // NOTE: The executor will be created by each test
067    procEnv = new TestProcEnv();
068    testDir = htu.getDataTestDir();
069    fs = testDir.getFileSystem(htu.getConfiguration());
070    assertTrue(testDir.depth() > 1);
071
072    logDir = new Path(testDir, "proc-logs");
073    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
074    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
075    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
076    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
077  }
078
079  @AfterAll
080  public static void tearDown() throws Exception {
081    procExecutor.stop();
082    procStore.stop(false);
083    procExecutor.join();
084  }
085
086  @Test
087  public void testBypassSuspendProcedure() throws Exception {
088    final SuspendProcedure proc = new SuspendProcedure();
089    long id = procExecutor.submitProcedure(proc);
090    Thread.sleep(500);
091    // bypass the procedure
092    assertTrue(procExecutor.bypassProcedure(id, 30000, false, false));
093    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
094    LOG.info("{} finished", proc);
095  }
096
097  @Test
098  public void testStuckProcedure() throws Exception {
099    final StuckProcedure proc = new StuckProcedure();
100    long id = procExecutor.submitProcedure(proc);
101    Thread.sleep(500);
102    // bypass the procedure
103    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
104    // Since the procedure is stuck there, we need to restart the executor to recovery.
105    ProcedureTestingUtility.restart(procExecutor);
106    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
107    LOG.info("{} finished", proc);
108  }
109
110  @Test
111  public void testBypassingProcedureWithParent() throws Exception {
112    final RootProcedure proc = new RootProcedure();
113    long rootId = procExecutor.submitProcedure(proc);
114    htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
115      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).size() > 0);
116    SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().stream()
117      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
118    assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false));
119    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
120    LOG.info("{} finished", proc);
121  }
122
123  @Test
124  public void testBypassingStuckStateMachineProcedure() throws Exception {
125    final StuckStateMachineProcedure proc =
126      new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START);
127    long id = procExecutor.submitProcedure(proc);
128    Thread.sleep(500);
129    // bypass the procedure
130    assertFalse(procExecutor.bypassProcedure(id, 1000, false, false));
131    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
132
133    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
134    LOG.info("{} finished", proc);
135  }
136
137  @Test
138  public void testBypassingProcedureWithParentRecursive() throws Exception {
139    final RootProcedure proc = new RootProcedure();
140    long rootId = procExecutor.submitProcedure(proc);
141    htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
142      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).size() > 0);
143    SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor.getProcedures().stream()
144      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
145    assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true));
146    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
147    LOG.info("{} finished", proc);
148  }
149
150  @Test
151  public void testBypassingWaitingTimeoutProcedures() throws Exception {
152    final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure();
153    long id = procExecutor.submitProcedure(proc);
154    Thread.sleep(500);
155    // bypass the procedure
156    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
157
158    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
159    LOG.info("{} finished", proc);
160  }
161
162  public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
163
164    public SuspendProcedure() {
165      super();
166    }
167
168    @Override
169    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
170      // Always suspend the procedure
171      throw new ProcedureSuspendedException();
172    }
173  }
174
175  public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
176
177    public StuckProcedure() {
178      super();
179    }
180
181    @Override
182    protected Procedure[] execute(final TestProcEnv env) {
183      try {
184        Thread.sleep(Long.MAX_VALUE);
185      } catch (Throwable t) {
186        LOG.debug("Sleep is interrupted.", t);
187      }
188      return null;
189    }
190
191  }
192
193  public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
194    private boolean childSpwaned = false;
195
196    public RootProcedure() {
197      super();
198    }
199
200    @Override
201    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
202      if (!childSpwaned) {
203        childSpwaned = true;
204        return new Procedure[] { new SuspendProcedure() };
205      } else {
206        return null;
207      }
208    }
209  }
210
211  public static class WaitingTimeoutProcedure
212    extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
213    public WaitingTimeoutProcedure() {
214      super();
215    }
216
217    @Override
218    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
219      // Always suspend the procedure
220      setTimeout(50000);
221      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
222      skipPersistence();
223      throw new ProcedureSuspendedException();
224    }
225
226    @Override
227    protected synchronized boolean setTimeoutFailure(TestProcEnv env) {
228      setState(ProcedureProtos.ProcedureState.RUNNABLE);
229      procExecutor.getScheduler().addFront(this);
230      return false; // 'false' means that this procedure handled the timeout
231    }
232  }
233
234  public enum StuckStateMachineState {
235    START,
236    THEN,
237    END
238  }
239
240  public static class StuckStateMachineProcedure
241    extends ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> {
242    private AtomicBoolean stop = new AtomicBoolean(false);
243
244    public StuckStateMachineProcedure() {
245      super();
246    }
247
248    public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) {
249      super(env, initialState);
250    }
251
252    @Override
253    protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState)
254      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
255      switch (tState) {
256        case START:
257          LOG.info("PHASE 1: START");
258          setNextState(StuckStateMachineState.THEN);
259          return Flow.HAS_MORE_STATE;
260        case THEN:
261          if (stop.get()) {
262            setNextState(StuckStateMachineState.END);
263          }
264          return Flow.HAS_MORE_STATE;
265        case END:
266          return Flow.NO_MORE_STATE;
267        default:
268          throw new UnsupportedOperationException("unhandled state=" + tState);
269      }
270    }
271
272    @Override
273    protected StuckStateMachineState getState(int stateId) {
274      return StuckStateMachineState.values()[stateId];
275    }
276
277    @Override
278    protected int getStateId(StuckStateMachineState tState) {
279      return tState.ordinal();
280    }
281  }
282}