001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicLong;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.jupiter.api.AfterEach;
036import org.junit.jupiter.api.BeforeEach;
037import org.junit.jupiter.api.Tag;
038import org.junit.jupiter.api.Test;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042@Tag(MasterTests.TAG)
043@Tag(SmallTests.TAG)
044public class TestYieldProcedures {
045
046  private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class);
047
048  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
049  private static final Procedure NULL_PROC = null;
050
051  private ProcedureExecutor<TestProcEnv> procExecutor;
052  private TestScheduler procRunnables;
053  private ProcedureStore procStore;
054
055  private HBaseCommonTestingUtil htu;
056  private FileSystem fs;
057  private Path testDir;
058  private Path logDir;
059
060  @BeforeEach
061  public void setUp() throws IOException {
062    htu = new HBaseCommonTestingUtil();
063    testDir = htu.getDataTestDir();
064    fs = testDir.getFileSystem(htu.getConfiguration());
065    assertTrue(testDir.depth() > 1);
066
067    logDir = new Path(testDir, "proc-logs");
068    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
069    procRunnables = new TestScheduler();
070    procExecutor =
071      new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
072    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
073    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
074  }
075
076  @AfterEach
077  public void tearDown() throws IOException {
078    procExecutor.stop();
079    procStore.stop(false);
080    fs.delete(logDir, true);
081  }
082
083  @Test
084  public void testYieldEachExecutionStep() throws Exception {
085    final int NUM_STATES = 3;
086
087    TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
088    for (int i = 0; i < procs.length; ++i) {
089      procs[i] = new TestStateMachineProcedure(true, false);
090      procExecutor.submitProcedure(procs[i]);
091    }
092    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
093
094    for (int i = 0; i < procs.length; ++i) {
095      assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
096
097      // verify execution
098      int index = 0;
099      for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
100        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
101        assertFalse(info.isRollback());
102        assertEquals(execStep, info.getStep().ordinal());
103      }
104
105      // verify rollback
106      for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
107        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
108        assertTrue(info.isRollback());
109        assertEquals(execStep, info.getStep().ordinal());
110      }
111    }
112
113    // check runnable queue stats
114    assertEquals(0, procRunnables.size());
115    assertEquals(0, procRunnables.addFrontCalls);
116    assertEquals(15, procRunnables.addBackCalls);
117    assertEquals(12, procRunnables.yieldCalls);
118    assertEquals(16, procRunnables.pollCalls);
119    assertEquals(3, procRunnables.completionCalls);
120  }
121
122  @Test
123  public void testYieldOnInterrupt() throws Exception {
124    final int NUM_STATES = 3;
125    int count = 0;
126
127    TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
128    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
129
130    // test execute (we execute steps twice, one has the IE the other completes)
131    assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
132    for (int i = 0; i < NUM_STATES; ++i) {
133      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
134      assertFalse(info.isRollback());
135      assertEquals(i, info.getStep().ordinal());
136
137      info = proc.getExecutionInfo().get(count++);
138      assertFalse(info.isRollback());
139      assertEquals(i, info.getStep().ordinal());
140    }
141
142    // test rollback (we execute steps twice, rollback counts both IE and completed)
143    for (int i = NUM_STATES - 1; i >= 0; --i) {
144      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
145      assertTrue(info.isRollback());
146      assertEquals(i, info.getStep().ordinal());
147    }
148
149    for (int i = NUM_STATES - 1; i >= 0; --i) {
150      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
151      assertEquals(true, info.isRollback());
152      assertEquals(0, info.getStep().ordinal());
153    }
154
155    // check runnable queue stats
156    assertEquals(0, procRunnables.size());
157    assertEquals(0, procRunnables.addFrontCalls);
158    assertEquals(11, procRunnables.addBackCalls);
159    assertEquals(10, procRunnables.yieldCalls);
160    assertEquals(12, procRunnables.pollCalls);
161    assertEquals(1, procRunnables.completionCalls);
162  }
163
164  @Test
165  public void testYieldException() {
166    TestYieldProcedure proc = new TestYieldProcedure();
167    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
168    assertEquals(6, proc.step);
169
170    // check runnable queue stats
171    assertEquals(0, procRunnables.size());
172    assertEquals(0, procRunnables.addFrontCalls);
173    assertEquals(6, procRunnables.addBackCalls);
174    assertEquals(5, procRunnables.yieldCalls);
175    assertEquals(7, procRunnables.pollCalls);
176    assertEquals(1, procRunnables.completionCalls);
177  }
178
179  private static class TestProcEnv {
180    public final AtomicLong timestamp = new AtomicLong(0);
181
182    public long nextTimestamp() {
183      return timestamp.incrementAndGet();
184    }
185  }
186
187  public static class TestStateMachineProcedure
188    extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
189    enum State {
190      STATE_1,
191      STATE_2,
192      STATE_3
193    }
194
195    public static class ExecutionInfo {
196      private final boolean rollback;
197      private final long timestamp;
198      private final State step;
199
200      public ExecutionInfo(long timestamp, State step, boolean isRollback) {
201        this.timestamp = timestamp;
202        this.step = step;
203        this.rollback = isRollback;
204      }
205
206      public State getStep() {
207        return step;
208      }
209
210      public long getTimestamp() {
211        return timestamp;
212      }
213
214      public boolean isRollback() {
215        return rollback;
216      }
217    }
218
219    private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>();
220    private final AtomicBoolean aborted = new AtomicBoolean(false);
221    private final boolean throwInterruptOnceOnEachStep;
222    private final boolean abortOnFinalStep;
223
224    public TestStateMachineProcedure() {
225      this(false, false);
226    }
227
228    public TestStateMachineProcedure(boolean abortOnFinalStep,
229      boolean throwInterruptOnceOnEachStep) {
230      this.abortOnFinalStep = abortOnFinalStep;
231      this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
232    }
233
234    public ArrayList<ExecutionInfo> getExecutionInfo() {
235      return executionInfo;
236    }
237
238    @Override
239    protected boolean isRollbackSupported(State state) {
240      return true;
241    }
242
243    @Override
244    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
245      throws InterruptedException {
246      final long ts = env.nextTimestamp();
247      LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
248      executionInfo.add(new ExecutionInfo(ts, state, false));
249      Thread.sleep(150);
250
251      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
252        LOG.debug("THROW INTERRUPT");
253        throw new InterruptedException("test interrupt");
254      }
255
256      switch (state) {
257        case STATE_1:
258          setNextState(State.STATE_2);
259          break;
260        case STATE_2:
261          setNextState(State.STATE_3);
262          break;
263        case STATE_3:
264          if (abortOnFinalStep) {
265            setFailure("test", new IOException("Requested abort on final step"));
266          }
267          return Flow.NO_MORE_STATE;
268        default:
269          throw new UnsupportedOperationException();
270      }
271      return Flow.HAS_MORE_STATE;
272    }
273
274    @Override
275    protected void rollbackState(TestProcEnv env, final State state) throws InterruptedException {
276      final long ts = env.nextTimestamp();
277      LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
278      executionInfo.add(new ExecutionInfo(ts, state, true));
279      Thread.sleep(150);
280
281      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
282        LOG.debug("THROW INTERRUPT");
283        throw new InterruptedException("test interrupt");
284      }
285
286      switch (state) {
287        case STATE_1:
288          break;
289        case STATE_2:
290          break;
291        case STATE_3:
292          break;
293        default:
294          throw new UnsupportedOperationException();
295      }
296    }
297
298    @Override
299    protected State getState(final int stateId) {
300      return State.values()[stateId];
301    }
302
303    @Override
304    protected int getStateId(final State state) {
305      return state.ordinal();
306    }
307
308    @Override
309    protected State getInitialState() {
310      return State.STATE_1;
311    }
312
313    @Override
314    protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
315      return true;
316    }
317
318    @Override
319    protected boolean abort(TestProcEnv env) {
320      aborted.set(true);
321      return true;
322    }
323  }
324
325  public static class TestYieldProcedure extends Procedure<TestProcEnv> {
326    private int step = 0;
327
328    public TestYieldProcedure() {
329    }
330
331    @Override
332    protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
333      LOG.info("execute step " + step);
334      if (step++ < 5) {
335        throw new ProcedureYieldException();
336      }
337      return null;
338    }
339
340    @Override
341    protected void rollback(TestProcEnv env) {
342    }
343
344    @Override
345    protected boolean abort(TestProcEnv env) {
346      return false;
347    }
348
349    @Override
350    protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
351      return true;
352    }
353
354    @Override
355    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
356    }
357
358    @Override
359    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
360    }
361  }
362
363  private static class TestScheduler extends SimpleProcedureScheduler {
364    private int completionCalls;
365    private int addFrontCalls;
366    private int addBackCalls;
367    private int yieldCalls;
368    private int pollCalls;
369
370    public TestScheduler() {
371    }
372
373    @Override
374    public void addFront(final Procedure proc) {
375      addFrontCalls++;
376      super.addFront(proc);
377    }
378
379    @Override
380    public void addBack(final Procedure proc) {
381      addBackCalls++;
382      super.addBack(proc);
383    }
384
385    @Override
386    public void yield(final Procedure proc) {
387      yieldCalls++;
388      super.yield(proc);
389    }
390
391    @Override
392    public Procedure poll() {
393      pollCalls++;
394      return super.poll();
395    }
396
397    @Override
398    public Procedure poll(long timeout, TimeUnit unit) {
399      pollCalls++;
400      return super.poll(timeout, unit);
401    }
402
403    @Override
404    public void completionCleanup(Procedure proc) {
405      completionCalls++;
406    }
407  }
408}