001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertTrue;
022
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.stream.Collectors;
025
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
031import org.apache.hadoop.hbase.testclassification.MasterTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.junit.AfterClass;
034import org.junit.BeforeClass;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
042
043
044@Category({MasterTests.class, SmallTests.class})
045public class TestProcedureBypass {
046
047  @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule
048      .forClass(TestProcedureBypass.class);
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureBypass.class);
051
052  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
053
054  private static TestProcEnv procEnv;
055  private static ProcedureStore procStore;
056
057  private static ProcedureExecutor<TestProcEnv> procExecutor;
058
059  private static HBaseCommonTestingUtility htu;
060
061  private static FileSystem fs;
062  private static Path testDir;
063  private static Path logDir;
064
065  private static class TestProcEnv {
066  }
067
068  @BeforeClass
069  public static void setUp() throws Exception {
070    htu = new HBaseCommonTestingUtility();
071
072    // NOTE: The executor will be created by each test
073    procEnv = new TestProcEnv();
074    testDir = htu.getDataTestDir();
075    fs = testDir.getFileSystem(htu.getConfiguration());
076    assertTrue(testDir.depth() > 1);
077
078    logDir = new Path(testDir, "proc-logs");
079    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
080    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv,
081        procStore);
082    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
083    ProcedureTestingUtility
084        .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
085  }
086
087  @Test
088  public void testBypassSuspendProcedure() throws Exception {
089    final SuspendProcedure proc = new SuspendProcedure();
090    long id = procExecutor.submitProcedure(proc);
091    Thread.sleep(500);
092    //bypass the procedure
093    assertTrue(procExecutor.bypassProcedure(id, 30000, false, false));
094    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
095    LOG.info("{} finished", proc);
096  }
097
098  @Test
099  public void testStuckProcedure() throws Exception {
100    final StuckProcedure proc = new StuckProcedure();
101    long id = procExecutor.submitProcedure(proc);
102    Thread.sleep(500);
103    //bypass the procedure
104    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
105    //Since the procedure is stuck there, we need to restart the executor to recovery.
106    ProcedureTestingUtility.restart(procExecutor);
107    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
108    LOG.info("{} finished", proc);
109  }
110
111  @Test
112  public void testBypassingProcedureWithParent() throws Exception {
113    final RootProcedure proc = new RootProcedure();
114    long rootId = procExecutor.submitProcedure(proc);
115    htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
116      .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList())
117      .size() > 0);
118    SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream()
119        .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
120    assertTrue(procExecutor.bypassProcedure(suspendProcedure.getProcId(), 1000, false, false));
121    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
122    LOG.info("{} finished", proc);
123  }
124
125  @Test
126  public void testBypassingProcedureWithParentRecursive() throws Exception {
127    final RootProcedure proc = new RootProcedure();
128    long rootId = procExecutor.submitProcedure(proc);
129    htu.waitFor(5000, () -> procExecutor.getProcedures().stream()
130        .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList())
131        .size() > 0);
132    SuspendProcedure suspendProcedure = (SuspendProcedure)procExecutor.getProcedures().stream()
133        .filter(p -> p.getParentProcId() == rootId).collect(Collectors.toList()).get(0);
134    assertTrue(procExecutor.bypassProcedure(rootId, 1000, false, true));
135    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
136    LOG.info("{} finished", proc);
137  }
138
139  @Test
140  public void testBypassingStuckStateMachineProcedure() throws Exception {
141    final StuckStateMachineProcedure proc =
142        new StuckStateMachineProcedure(procEnv, StuckStateMachineState.START);
143    long id = procExecutor.submitProcedure(proc);
144    Thread.sleep(500);
145    // bypass the procedure
146    assertFalse(procExecutor.bypassProcedure(id, 1000, false, false));
147    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
148
149    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
150    LOG.info("{} finished", proc);
151  }
152
153  @Test
154  public void testBypassingWaitingTimeoutProcedures() throws Exception {
155    final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure();
156    long id = procExecutor.submitProcedure(proc);
157    Thread.sleep(500);
158    // bypass the procedure
159    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
160
161    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
162    LOG.info("{} finished", proc);
163  }
164
165  @AfterClass
166  public static void tearDown() throws Exception {
167    procExecutor.stop();
168    procStore.stop(false);
169    procExecutor.join();
170  }
171
172  public static class SuspendProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
173
174    public SuspendProcedure() {
175      super();
176    }
177
178    @Override
179    protected Procedure[] execute(final TestProcEnv env)
180        throws ProcedureSuspendedException {
181      // Always suspend the procedure
182      throw new ProcedureSuspendedException();
183    }
184  }
185
186  public static class StuckProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
187
188    public StuckProcedure() {
189      super();
190    }
191
192    @Override
193    protected Procedure[] execute(final TestProcEnv env) {
194      try {
195        Thread.sleep(Long.MAX_VALUE);
196      } catch (Throwable t) {
197        LOG.debug("Sleep is interrupted.", t);
198      }
199      return null;
200    }
201
202  }
203
204  public static class WaitingTimeoutProcedure
205      extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
206    public WaitingTimeoutProcedure() {
207      super();
208    }
209
210    @Override
211    protected Procedure[] execute(final TestProcEnv env)
212        throws ProcedureSuspendedException {
213      // Always suspend the procedure
214      setTimeout(50000);
215      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
216      skipPersistence();
217      throw new ProcedureSuspendedException();
218    }
219
220    @Override
221    protected synchronized boolean setTimeoutFailure(TestProcEnv env) {
222      setState(ProcedureProtos.ProcedureState.RUNNABLE);
223      procExecutor.getScheduler().addFront(this);
224      return false; // 'false' means that this procedure handled the timeout
225    }
226  }
227
228  public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
229    private boolean childSpwaned = false;
230
231    public RootProcedure() {
232      super();
233    }
234
235    @Override
236    protected Procedure[] execute(final TestProcEnv env)
237        throws ProcedureSuspendedException {
238      if (!childSpwaned) {
239        childSpwaned = true;
240        return new Procedure[] {new SuspendProcedure()};
241      } else {
242        return null;
243      }
244    }
245  }
246
247  public enum StuckStateMachineState {
248    START, THEN, END
249  }
250
251  public static class StuckStateMachineProcedure extends
252      ProcedureTestingUtility.NoopStateMachineProcedure<TestProcEnv, StuckStateMachineState> {
253    private AtomicBoolean stop = new AtomicBoolean(false);
254
255    public StuckStateMachineProcedure() {
256      super();
257    }
258
259    public StuckStateMachineProcedure(TestProcEnv env, StuckStateMachineState initialState) {
260      super(env, initialState);
261    }
262
263    @Override
264    protected Flow executeFromState(TestProcEnv env, StuckStateMachineState tState)
265            throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
266      switch (tState) {
267        case START:
268          LOG.info("PHASE 1: START");
269          setNextState(StuckStateMachineState.THEN);
270          return Flow.HAS_MORE_STATE;
271        case THEN:
272          if (stop.get()) {
273            setNextState(StuckStateMachineState.END);
274          }
275          return Flow.HAS_MORE_STATE;
276        case END:
277          return Flow.NO_MORE_STATE;
278        default:
279          throw new UnsupportedOperationException("unhandled state=" + tState);
280      }
281    }
282
283    @Override
284    protected StuckStateMachineState getState(int stateId) {
285      return StuckStateMachineState.values()[stateId];
286    }
287
288    @Override
289    protected int getStateId(StuckStateMachineState tState) {
290      return tState.ordinal();
291    }
292  }
293}