001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
032import org.apache.hadoop.hbase.procedure2.TestYieldProcedures.TestStateMachineProcedure.State;
033import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.junit.After;
037import org.junit.Before;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044@Category({ MasterTests.class, SmallTests.class })
045public class TestYieldProcedures {
046  @ClassRule
047  public static final HBaseClassTestRule CLASS_RULE =
048    HBaseClassTestRule.forClass(TestYieldProcedures.class);
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class);
051
052  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
053  private static final Procedure NULL_PROC = null;
054
055  private ProcedureExecutor<TestProcEnv> procExecutor;
056  private TestScheduler procRunnables;
057  private ProcedureStore procStore;
058
059  private HBaseCommonTestingUtil htu;
060  private FileSystem fs;
061  private Path testDir;
062  private Path logDir;
063
064  @Before
065  public void setUp() throws IOException {
066    htu = new HBaseCommonTestingUtil();
067    testDir = htu.getDataTestDir();
068    fs = testDir.getFileSystem(htu.getConfiguration());
069    assertTrue(testDir.depth() > 1);
070
071    logDir = new Path(testDir, "proc-logs");
072    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
073    procRunnables = new TestScheduler();
074    procExecutor =
075      new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
076    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
077    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
078  }
079
080  @After
081  public void tearDown() throws IOException {
082    procExecutor.stop();
083    procStore.stop(false);
084    fs.delete(logDir, true);
085  }
086
087  @Test
088  public void testYieldEachExecutionStep() throws Exception {
089    final int NUM_STATES = 3;
090
091    TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
092    for (int i = 0; i < procs.length; ++i) {
093      procs[i] = new TestStateMachineProcedure(true, false);
094      procExecutor.submitProcedure(procs[i]);
095    }
096    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
097
098    for (int i = 0; i < procs.length; ++i) {
099      assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
100
101      // verify execution
102      int index = 0;
103      for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
104        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
105        assertEquals(false, info.isRollback());
106        assertEquals(execStep, info.getStep().ordinal());
107      }
108
109      // verify rollback
110      for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
111        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
112        assertEquals(true, info.isRollback());
113        assertEquals(execStep, info.getStep().ordinal());
114      }
115    }
116
117    // check runnable queue stats
118    assertEquals(0, procRunnables.size());
119    assertEquals(0, procRunnables.addFrontCalls);
120    assertEquals(15, procRunnables.addBackCalls);
121    assertEquals(12, procRunnables.yieldCalls);
122    assertEquals(16, procRunnables.pollCalls);
123    assertEquals(3, procRunnables.completionCalls);
124  }
125
126  @Test
127  public void testYieldOnInterrupt() throws Exception {
128    final int NUM_STATES = 3;
129    int count = 0;
130
131    TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
132    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
133
134    // test execute (we execute steps twice, one has the IE the other completes)
135    assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
136    for (int i = 0; i < NUM_STATES; ++i) {
137      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
138      assertEquals(false, info.isRollback());
139      assertEquals(i, info.getStep().ordinal());
140
141      info = proc.getExecutionInfo().get(count++);
142      assertEquals(false, info.isRollback());
143      assertEquals(i, info.getStep().ordinal());
144    }
145
146    // test rollback (we execute steps twice, rollback counts both IE and completed)
147    for (int i = NUM_STATES - 1; i >= 0; --i) {
148      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
149      assertEquals(true, info.isRollback());
150      assertEquals(i, info.getStep().ordinal());
151    }
152
153    for (int i = NUM_STATES - 1; i >= 0; --i) {
154      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
155      assertEquals(true, info.isRollback());
156      assertEquals(0, info.getStep().ordinal());
157    }
158
159    // check runnable queue stats
160    assertEquals(0, procRunnables.size());
161    assertEquals(0, procRunnables.addFrontCalls);
162    assertEquals(11, procRunnables.addBackCalls);
163    assertEquals(10, procRunnables.yieldCalls);
164    assertEquals(12, procRunnables.pollCalls);
165    assertEquals(1, procRunnables.completionCalls);
166  }
167
168  @Test
169  public void testYieldException() {
170    TestYieldProcedure proc = new TestYieldProcedure();
171    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
172    assertEquals(6, proc.step);
173
174    // check runnable queue stats
175    assertEquals(0, procRunnables.size());
176    assertEquals(0, procRunnables.addFrontCalls);
177    assertEquals(6, procRunnables.addBackCalls);
178    assertEquals(5, procRunnables.yieldCalls);
179    assertEquals(7, procRunnables.pollCalls);
180    assertEquals(1, procRunnables.completionCalls);
181  }
182
183  private static class TestProcEnv {
184    public final AtomicLong timestamp = new AtomicLong(0);
185
186    public long nextTimestamp() {
187      return timestamp.incrementAndGet();
188    }
189  }
190
191  public static class TestStateMachineProcedure
192    extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
193    enum State {
194      STATE_1,
195      STATE_2,
196      STATE_3
197    }
198
199    public static class ExecutionInfo {
200      private final boolean rollback;
201      private final long timestamp;
202      private final State step;
203
204      public ExecutionInfo(long timestamp, State step, boolean isRollback) {
205        this.timestamp = timestamp;
206        this.step = step;
207        this.rollback = isRollback;
208      }
209
210      public State getStep() {
211        return step;
212      }
213
214      public long getTimestamp() {
215        return timestamp;
216      }
217
218      public boolean isRollback() {
219        return rollback;
220      }
221    }
222
223    private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>();
224    private final AtomicBoolean aborted = new AtomicBoolean(false);
225    private final boolean throwInterruptOnceOnEachStep;
226    private final boolean abortOnFinalStep;
227
228    public TestStateMachineProcedure() {
229      this(false, false);
230    }
231
232    public TestStateMachineProcedure(boolean abortOnFinalStep,
233      boolean throwInterruptOnceOnEachStep) {
234      this.abortOnFinalStep = abortOnFinalStep;
235      this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
236    }
237
238    public ArrayList<ExecutionInfo> getExecutionInfo() {
239      return executionInfo;
240    }
241
242    @Override
243    protected boolean isRollbackSupported(State state) {
244      return true;
245    }
246
247    @Override
248    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
249      throws InterruptedException {
250      final long ts = env.nextTimestamp();
251      LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
252      executionInfo.add(new ExecutionInfo(ts, state, false));
253      Thread.sleep(150);
254
255      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
256        LOG.debug("THROW INTERRUPT");
257        throw new InterruptedException("test interrupt");
258      }
259
260      switch (state) {
261        case STATE_1:
262          setNextState(State.STATE_2);
263          break;
264        case STATE_2:
265          setNextState(State.STATE_3);
266          break;
267        case STATE_3:
268          if (abortOnFinalStep) {
269            setFailure("test", new IOException("Requested abort on final step"));
270          }
271          return Flow.NO_MORE_STATE;
272        default:
273          throw new UnsupportedOperationException();
274      }
275      return Flow.HAS_MORE_STATE;
276    }
277
278    @Override
279    protected void rollbackState(TestProcEnv env, final State state) throws InterruptedException {
280      final long ts = env.nextTimestamp();
281      LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
282      executionInfo.add(new ExecutionInfo(ts, state, true));
283      Thread.sleep(150);
284
285      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
286        LOG.debug("THROW INTERRUPT");
287        throw new InterruptedException("test interrupt");
288      }
289
290      switch (state) {
291        case STATE_1:
292          break;
293        case STATE_2:
294          break;
295        case STATE_3:
296          break;
297        default:
298          throw new UnsupportedOperationException();
299      }
300    }
301
302    @Override
303    protected State getState(final int stateId) {
304      return State.values()[stateId];
305    }
306
307    @Override
308    protected int getStateId(final State state) {
309      return state.ordinal();
310    }
311
312    @Override
313    protected State getInitialState() {
314      return State.STATE_1;
315    }
316
317    @Override
318    protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
319      return true;
320    }
321
322    @Override
323    protected boolean abort(TestProcEnv env) {
324      aborted.set(true);
325      return true;
326    }
327  }
328
329  public static class TestYieldProcedure extends Procedure<TestProcEnv> {
330    private int step = 0;
331
332    public TestYieldProcedure() {
333    }
334
335    @Override
336    protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
337      LOG.info("execute step " + step);
338      if (step++ < 5) {
339        throw new ProcedureYieldException();
340      }
341      return null;
342    }
343
344    @Override
345    protected void rollback(TestProcEnv env) {
346    }
347
348    @Override
349    protected boolean abort(TestProcEnv env) {
350      return false;
351    }
352
353    @Override
354    protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
355      return true;
356    }
357
358    @Override
359    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
360    }
361
362    @Override
363    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
364    }
365  }
366
367  private static class TestScheduler extends SimpleProcedureScheduler {
368    private int completionCalls;
369    private int addFrontCalls;
370    private int addBackCalls;
371    private int yieldCalls;
372    private int pollCalls;
373
374    public TestScheduler() {
375    }
376
377    @Override
378    public void addFront(final Procedure proc) {
379      addFrontCalls++;
380      super.addFront(proc);
381    }
382
383    @Override
384    public void addBack(final Procedure proc) {
385      addBackCalls++;
386      super.addBack(proc);
387    }
388
389    @Override
390    public void yield(final Procedure proc) {
391      yieldCalls++;
392      super.yield(proc);
393    }
394
395    @Override
396    public Procedure poll() {
397      pollCalls++;
398      return super.poll();
399    }
400
401    @Override
402    public Procedure poll(long timeout, TimeUnit unit) {
403      pollCalls++;
404      return super.poll(timeout, unit);
405    }
406
407    @Override
408    public void completionCleanup(Procedure proc) {
409      completionCalls++;
410    }
411  }
412}