001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Category({MasterTests.class, SmallTests.class})
044public class TestYieldProcedures {
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047      HBaseClassTestRule.forClass(TestYieldProcedures.class);
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class);
050
051  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
052  private static final Procedure NULL_PROC = null;
053
054  private ProcedureExecutor<TestProcEnv> procExecutor;
055  private TestScheduler procRunnables;
056  private ProcedureStore procStore;
057
058  private HBaseCommonTestingUtility htu;
059  private FileSystem fs;
060  private Path testDir;
061  private Path logDir;
062
063  @Before
064  public void setUp() throws IOException {
065    htu = new HBaseCommonTestingUtility();
066    testDir = htu.getDataTestDir();
067    fs = testDir.getFileSystem(htu.getConfiguration());
068    assertTrue(testDir.depth() > 1);
069
070    logDir = new Path(testDir, "proc-logs");
071    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
072    procRunnables = new TestScheduler();
073    procExecutor =
074      new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
075    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
076    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
077  }
078
079  @After
080  public void tearDown() throws IOException {
081    procExecutor.stop();
082    procStore.stop(false);
083    fs.delete(logDir, true);
084  }
085
086  @Test
087  public void testYieldEachExecutionStep() throws Exception {
088    final int NUM_STATES = 3;
089
090    TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
091    for (int i = 0; i < procs.length; ++i) {
092      procs[i] = new TestStateMachineProcedure(true, false);
093      procExecutor.submitProcedure(procs[i]);
094    }
095    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
096
097    for (int i = 0; i < procs.length; ++i) {
098      assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
099
100      // verify execution
101      int index = 0;
102      for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
103        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
104        assertEquals(false, info.isRollback());
105        assertEquals(execStep, info.getStep().ordinal());
106      }
107
108      // verify rollback
109      for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
110        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
111        assertEquals(true, info.isRollback());
112        assertEquals(execStep, info.getStep().ordinal());
113      }
114    }
115
116    // check runnable queue stats
117    assertEquals(0, procRunnables.size());
118    assertEquals(0, procRunnables.addFrontCalls);
119    assertEquals(15, procRunnables.addBackCalls);
120    assertEquals(12, procRunnables.yieldCalls);
121    assertEquals(16, procRunnables.pollCalls);
122    assertEquals(3, procRunnables.completionCalls);
123  }
124
125  @Test
126  public void testYieldOnInterrupt() throws Exception {
127    final int NUM_STATES = 3;
128    int count = 0;
129
130    TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
131    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
132
133    // test execute (we execute steps twice, one has the IE the other completes)
134    assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
135    for (int i = 0; i < NUM_STATES; ++i) {
136      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
137      assertEquals(false, info.isRollback());
138      assertEquals(i, info.getStep().ordinal());
139
140      info = proc.getExecutionInfo().get(count++);
141      assertEquals(false, info.isRollback());
142      assertEquals(i, info.getStep().ordinal());
143    }
144
145    // test rollback (we execute steps twice, one has the IE the other completes)
146    for (int i = NUM_STATES - 1; i >= 0; --i) {
147      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
148      assertEquals(true, info.isRollback());
149      assertEquals(i, info.getStep().ordinal());
150
151      info = proc.getExecutionInfo().get(count++);
152      assertEquals(true, info.isRollback());
153      assertEquals(i, info.getStep().ordinal());
154    }
155
156    // check runnable queue stats
157    assertEquals(0, procRunnables.size());
158    assertEquals(0, procRunnables.addFrontCalls);
159    assertEquals(11, procRunnables.addBackCalls);
160    assertEquals(10, procRunnables.yieldCalls);
161    assertEquals(12, procRunnables.pollCalls);
162    assertEquals(1, procRunnables.completionCalls);
163  }
164
165  @Test
166  public void testYieldException() {
167    TestYieldProcedure proc = new TestYieldProcedure();
168    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
169    assertEquals(6, proc.step);
170
171    // check runnable queue stats
172    assertEquals(0, procRunnables.size());
173    assertEquals(0, procRunnables.addFrontCalls);
174    assertEquals(6, procRunnables.addBackCalls);
175    assertEquals(5, procRunnables.yieldCalls);
176    assertEquals(7, procRunnables.pollCalls);
177    assertEquals(1, procRunnables.completionCalls);
178  }
179
180  private static class TestProcEnv {
181    public final AtomicLong timestamp = new AtomicLong(0);
182
183    public long nextTimestamp() {
184      return timestamp.incrementAndGet();
185    }
186  }
187
188  public static class TestStateMachineProcedure
189      extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
190    enum State { STATE_1, STATE_2, STATE_3 }
191
192    public static class ExecutionInfo {
193      private final boolean rollback;
194      private final long timestamp;
195      private final State step;
196
197      public ExecutionInfo(long timestamp, State step, boolean isRollback) {
198        this.timestamp = timestamp;
199        this.step = step;
200        this.rollback = isRollback;
201      }
202
203      public State getStep() {
204        return step;
205      }
206
207      public long getTimestamp() {
208        return timestamp;
209      }
210
211      public boolean isRollback() {
212        return rollback;
213      }
214    }
215
216    private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>();
217    private final AtomicBoolean aborted = new AtomicBoolean(false);
218    private final boolean throwInterruptOnceOnEachStep;
219    private final boolean abortOnFinalStep;
220
221    public TestStateMachineProcedure() {
222      this(false, false);
223    }
224
225    public TestStateMachineProcedure(boolean abortOnFinalStep,
226        boolean throwInterruptOnceOnEachStep) {
227      this.abortOnFinalStep = abortOnFinalStep;
228      this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
229    }
230
231    public ArrayList<ExecutionInfo> getExecutionInfo() {
232      return executionInfo;
233    }
234
235    @Override
236    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
237        throws InterruptedException {
238      final long ts = env.nextTimestamp();
239      LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
240      executionInfo.add(new ExecutionInfo(ts, state, false));
241      Thread.sleep(150);
242
243      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
244        LOG.debug("THROW INTERRUPT");
245        throw new InterruptedException("test interrupt");
246      }
247
248      switch (state) {
249        case STATE_1:
250          setNextState(State.STATE_2);
251          break;
252        case STATE_2:
253          setNextState(State.STATE_3);
254          break;
255        case STATE_3:
256          if (abortOnFinalStep) {
257            setFailure("test", new IOException("Requested abort on final step"));
258          }
259          return Flow.NO_MORE_STATE;
260        default:
261          throw new UnsupportedOperationException();
262      }
263      return Flow.HAS_MORE_STATE;
264    }
265
266    @Override
267    protected void rollbackState(TestProcEnv env, final State state)
268        throws InterruptedException {
269      final long ts = env.nextTimestamp();
270      LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
271      executionInfo.add(new ExecutionInfo(ts, state, true));
272      Thread.sleep(150);
273
274      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
275        LOG.debug("THROW INTERRUPT");
276        throw new InterruptedException("test interrupt");
277      }
278
279      switch (state) {
280        case STATE_1:
281          break;
282        case STATE_2:
283          break;
284        case STATE_3:
285          break;
286        default:
287          throw new UnsupportedOperationException();
288      }
289    }
290
291    @Override
292    protected State getState(final int stateId) {
293      return State.values()[stateId];
294    }
295
296    @Override
297    protected int getStateId(final State state) {
298      return state.ordinal();
299    }
300
301    @Override
302    protected State getInitialState() {
303      return State.STATE_1;
304    }
305
306    @Override
307    protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
308      return true;
309    }
310
311    @Override
312    protected boolean abort(TestProcEnv env) {
313      aborted.set(true);
314      return true;
315    }
316  }
317
318  public static class TestYieldProcedure extends Procedure<TestProcEnv> {
319    private int step = 0;
320
321    public TestYieldProcedure() {
322    }
323
324    @Override
325    protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
326      LOG.info("execute step " + step);
327      if (step++ < 5) {
328        throw new ProcedureYieldException();
329      }
330      return null;
331    }
332
333    @Override
334    protected void rollback(TestProcEnv env) {
335    }
336
337    @Override
338    protected boolean abort(TestProcEnv env) {
339      return false;
340    }
341
342    @Override
343    protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
344      return true;
345    }
346
347    @Override
348    protected void serializeStateData(ProcedureStateSerializer serializer)
349        throws IOException {
350    }
351
352    @Override
353    protected void deserializeStateData(ProcedureStateSerializer serializer)
354        throws IOException {
355    }
356  }
357
358  private static class TestScheduler extends SimpleProcedureScheduler {
359    private int completionCalls;
360    private int addFrontCalls;
361    private int addBackCalls;
362    private int yieldCalls;
363    private int pollCalls;
364
365    public TestScheduler() {}
366
367    @Override
368    public void addFront(final Procedure proc) {
369      addFrontCalls++;
370      super.addFront(proc);
371    }
372
373    @Override
374    public void addBack(final Procedure proc) {
375      addBackCalls++;
376      super.addBack(proc);
377    }
378
379    @Override
380    public void yield(final Procedure proc) {
381      yieldCalls++;
382      super.yield(proc);
383    }
384
385    @Override
386    public Procedure poll() {
387      pollCalls++;
388      return super.poll();
389    }
390
391    @Override
392    public Procedure poll(long timeout, TimeUnit unit) {
393      pollCalls++;
394      return super.poll(timeout, unit);
395    }
396
397    @Override
398    public void completionCleanup(Procedure proc) {
399      completionCalls++;
400    }
401  }
402}