001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.stream.Collectors;
025
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
031import org.apache.hadoop.hbase.testclassification.MasterTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.junit.AfterClass;
034import org.junit.BeforeClass;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
042
043
044@Category({MasterTests.class, SmallTests.class})
045public class TestProcedureBypass {
046
047  @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
048      .forClass(TestProcedureBypass.class);
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class);
051
052  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
053
054  private static TestProcEnv procEnv;
055  private static ProcedureStore procStore;
056
057  private static ProcedureExecutor<TestProcEnv> procExecutor;
058
059  private static HBaseCommonTestingUtility htu;
060
061  private static FileSystem fs;
062  private static Path testDir;
063  private static Path logDir;
064
065  private static class TestProcEnv {
066  }
067
068  @BeforeClass
069  public static void setUp() throws Exception {
070    htu = new HBaseCommonTestingUtility();
071
072    // NOTE: The executor will be created by each test
073    procEnv = new TestProcEnv();
074    testDir = htu.getDataTestDir();
075    fs = testDir.getFileSystem(htu.getConfiguration());
076    assertTrue(testDir.depth() > 1);
077
078    logDir = new Path(testDir, "proc-logs");
079    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
080    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv,
081        procStore);
082    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
083    ProcedureTestingUtility
084        .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
085  }
086
087  @Test
088  public void testBypassSuspendProcedure() throws Exception {
089    final SuspendProcedure proc = new SuspendProcedure();
090    long id = procExecutor.submitProcedure(proc);
091    Thread.sleep(500);
092    //bypass the procedure
093    assertTrue(procExecutor.bypassProcedure(id, 30000, false, false));
094    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
095    LOG.info("{} finished", proc);
096  }
097
098  @Test
099  public void testStuckProcedure() throws Exception {
100    final StuckProcedure proc = new StuckProcedure();
101    long id = procExecutor.submitProcedure(proc);
102    Thread.sleep(500);
103    //bypass the procedure
104    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
105    //Since the procedure is stuck there, we need to restart the executor to recovery.
106    ProcedureTestingUtility.restart(procExecutor);
107    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
108    LOG.info("{} finished", proc);
109  }
110
111  @Test
112  public void testBypassingProcedureWithParent() throws Exception {
113    final RootProcedure proc = new RootProcedure();
114    long rootId = procExecutor.submitProcedure(proc);
115    htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
116      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList())
117      .size() > 0);
118    SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream()
119        .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
120    assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false));
121    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
122    LOG.info("{} finished", proc);
123  }
124
125  @Test
126  public void testBypassingStuckStateMachineProcedure() throws Exception {
127    final StuckStateMachineProcedure proc =
128        new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START);
129    long id = procExecutor.submitProcedure(proc);
130    Thread.sleep(500);
131    // bypass the procedure
132    assertFalse(procExecutor.bypassProcedure(id, 1000, false, false));
133    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
134
135    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
136    LOG.info("{} finished", proc);
137  }
138
139  @Test
140  public void testBypassingProcedureWithParentRecursive() throws Exception {
141    final RootProcedure proc = new RootProcedure();
142    long rootId = procExecutor.submitProcedure(proc);
143    htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
144        .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList())
145        .size() > 0);
146    SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream()
147        .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
148    assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true));
149    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
150    LOG.info("{} finished", proc);
151  }
152
153  @Test
154  public void testBypassingWaitingTimeoutProcedures() throws Exception {
155    final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure();
156    long id = procExecutor.submitProcedure(proc);
157    Thread.sleep(500);
158    // bypass the procedure
159    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
160
161    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
162    LOG.info("{} finished", proc);
163  }
164
165  @AfterClass
166  public static void tearDown() throws Exception {
167    procExecutor.stop();
168    procStore.stop(false);
169    procExecutor.join();
170  }
171
172  public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
173
174    public SuspendProcedure() {
175      super();
176    }
177
178    @Override
179    protected Procedure[] execute(final TestProcEnv env)
180        throws ProcedureSuspendedException {
181      // Always suspend the procedure
182      throw new ProcedureSuspendedException();
183    }
184  }
185
186  public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
187
188    public StuckProcedure() {
189      super();
190    }
191
192    @Override
193    protected Procedure[] execute(final TestProcEnv env) {
194      try {
195        Thread.sleep(Long.MAX_VALUE);
196      } catch (Throwable t) {
197        LOG.debug("Sleep is interrupted.", t);
198      }
199      return null;
200    }
201
202  }
203
204
205  public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
206    private boolean childSpwaned = false;
207
208    public RootProcedure() {
209      super();
210    }
211
212    @Override
213    protected Procedure[] execute(final TestProcEnv env)
214        throws ProcedureSuspendedException {
215      if (!childSpwaned) {
216        childSpwaned = true;
217        return new Procedure[] {new SuspendProcedure()};
218      } else {
219        return null;
220      }
221    }
222  }
223
224  public static class WaitingTimeoutProcedure
225      extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
226    public WaitingTimeoutProcedure() {
227      super();
228    }
229
230    @Override
231    protected Procedure[] execute(final TestProcEnv env)
232        throws ProcedureSuspendedException {
233      // Always suspend the procedure
234      setTimeout(50000);
235      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
236      skipPersistence();
237      throw new ProcedureSuspendedException();
238    }
239
240    @Override
241    protected synchronized boolean setTimeoutFailure(TestProcEnv env) {
242      setState(ProcedureProtos.ProcedureState.RUNNABLE);
243      procExecutor.getScheduler().addFront(this);
244      return false; // 'false' means that this procedure handled the timeout
245    }
246  }
247
248  public enum StuckStateMachineState {
249    START, THEN, END
250  }
251
252  public static class StuckStateMachineProcedure extends
253      ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> {
254    private AtomicBoolean stop = new AtomicBoolean(false);
255
256    public StuckStateMachineProcedure() {
257      super();
258    }
259
260    public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) {
261      super(env, initialState);
262    }
263
264    @Override
265    protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState)
266            throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
267      switch (tState) {
268        case START:
269          LOG.info("PHASE 1: START");
270          setNextState(StuckStateMachineState.THEN);
271          return Flow.HAS_MORE_STATE;
272        case THEN:
273          if (stop.get()) {
274            setNextState(StuckStateMachineState.END);
275          }
276          return Flow.HAS_MORE_STATE;
277        case END:
278          return Flow.NO_MORE_STATE;
279        default:
280          throw new UnsupportedOperationException("unhandled state=" + tState);
281      }
282    }
283
284    @Override
285    protected StuckStateMachineState getState(int stateId) {
286      return StuckStateMachineState.values()[stateId];
287    }
288
289    @Override
290    protected int getStateId(StuckStateMachineState tState) {
291      return tState.ordinal();
292    }
293  }
294
295
296}