001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Category({MasterTests.class, SmallTests.class})
044public class TestYieldProcedures {
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047      HBaseClassTestRule.forClass(TestYieldProcedures.class);
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class);
050
051  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
052  private static final Procedure NULL_PROC = null;
053
054  private ProcedureExecutor<TestProcEnv> procExecutor;
055  private TestScheduler procRunnables;
056  private ProcedureStore procStore;
057
058  private HBaseCommonTestingUtility htu;
059  private FileSystem fs;
060  private Path testDir;
061  private Path logDir;
062
063  @Before
064  public void setUp() throws IOException {
065    htu = new HBaseCommonTestingUtility();
066    testDir = htu.getDataTestDir();
067    fs = testDir.getFileSystem(htu.getConfiguration());
068    assertTrue(testDir.depth() > 1);
069
070    logDir = new Path(testDir, "proc-logs");
071    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
072    procRunnables = new TestScheduler();
073    procExecutor =
074      new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
075    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
076    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
077  }
078
079  @After
080  public void tearDown() throws IOException {
081    procExecutor.stop();
082    procStore.stop(false);
083    fs.delete(logDir, true);
084  }
085
086  @Test
087  public void testYieldEachExecutionStep() throws Exception {
088    final int NUM_STATES = 3;
089
090    TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
091    for (int i = 0; i < procs.length; ++i) {
092      procs[i] = new TestStateMachineProcedure(true, false);
093      procExecutor.submitProcedure(procs[i]);
094    }
095    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
096
097    for (int i = 0; i < procs.length; ++i) {
098      assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
099
100      // verify execution
101      int index = 0;
102      for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
103        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
104        assertEquals(false, info.isRollback());
105        assertEquals(execStep, info.getStep().ordinal());
106      }
107
108      // verify rollback
109      for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
110        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
111        assertEquals(true, info.isRollback());
112        assertEquals(execStep, info.getStep().ordinal());
113      }
114    }
115
116    // check runnable queue stats
117    assertEquals(0, procRunnables.size());
118    assertEquals(0, procRunnables.addFrontCalls);
119    assertEquals(15, procRunnables.addBackCalls);
120    assertEquals(12, procRunnables.yieldCalls);
121    assertEquals(16, procRunnables.pollCalls);
122    assertEquals(3, procRunnables.completionCalls);
123  }
124
125  @Test
126  public void testYieldOnInterrupt() throws Exception {
127    final int NUM_STATES = 3;
128    int count = 0;
129
130    TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
131    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
132
133    // test execute (we execute steps twice, one has the IE the other completes)
134    assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
135    for (int i = 0; i < NUM_STATES; ++i) {
136      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
137      assertEquals(false, info.isRollback());
138      assertEquals(i, info.getStep().ordinal());
139
140      info = proc.getExecutionInfo().get(count++);
141      assertEquals(false, info.isRollback());
142      assertEquals(i, info.getStep().ordinal());
143    }
144
145    // test rollback (we execute steps twice, rollback counts both IE and completed)
146    for (int i = NUM_STATES - 1; i >= 0; --i) {
147      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
148      assertEquals(true, info.isRollback());
149      assertEquals(i, info.getStep().ordinal());
150    }
151
152    for (int i = NUM_STATES - 1; i >= 0; --i) {
153      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
154      assertEquals(true, info.isRollback());
155      assertEquals(0, info.getStep().ordinal());
156    }
157
158    // check runnable queue stats
159    assertEquals(0, procRunnables.size());
160    assertEquals(0, procRunnables.addFrontCalls);
161    assertEquals(11, procRunnables.addBackCalls);
162    assertEquals(10, procRunnables.yieldCalls);
163    assertEquals(12, procRunnables.pollCalls);
164    assertEquals(1, procRunnables.completionCalls);
165  }
166
167  @Test
168  public void testYieldException() {
169    TestYieldProcedure proc = new TestYieldProcedure();
170    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
171    assertEquals(6, proc.step);
172
173    // check runnable queue stats
174    assertEquals(0, procRunnables.size());
175    assertEquals(0, procRunnables.addFrontCalls);
176    assertEquals(6, procRunnables.addBackCalls);
177    assertEquals(5, procRunnables.yieldCalls);
178    assertEquals(7, procRunnables.pollCalls);
179    assertEquals(1, procRunnables.completionCalls);
180  }
181
182  private static class TestProcEnv {
183    public final AtomicLong timestamp = new AtomicLong(0);
184
185    public long nextTimestamp() {
186      return timestamp.incrementAndGet();
187    }
188  }
189
190  public static class TestStateMachineProcedure
191      extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
192    enum State { STATE_1, STATE_2, STATE_3 }
193
194    public static class ExecutionInfo {
195      private final boolean rollback;
196      private final long timestamp;
197      private final State step;
198
199      public ExecutionInfo(long timestamp, State step, boolean isRollback) {
200        this.timestamp = timestamp;
201        this.step = step;
202        this.rollback = isRollback;
203      }
204
205      public State getStep() {
206        return step;
207      }
208
209      public long getTimestamp() {
210        return timestamp;
211      }
212
213      public boolean isRollback() {
214        return rollback;
215      }
216    }
217
218    private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>();
219    private final AtomicBoolean aborted = new AtomicBoolean(false);
220    private final boolean throwInterruptOnceOnEachStep;
221    private final boolean abortOnFinalStep;
222
223    public TestStateMachineProcedure() {
224      this(false, false);
225    }
226
227    public TestStateMachineProcedure(boolean abortOnFinalStep,
228        boolean throwInterruptOnceOnEachStep) {
229      this.abortOnFinalStep = abortOnFinalStep;
230      this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
231    }
232
233    public ArrayList<ExecutionInfo> getExecutionInfo() {
234      return executionInfo;
235    }
236
237    @Override
238    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
239        throws InterruptedException {
240      final long ts = env.nextTimestamp();
241      LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
242      executionInfo.add(new ExecutionInfo(ts, state, false));
243      Thread.sleep(150);
244
245      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
246        LOG.debug("THROW INTERRUPT");
247        throw new InterruptedException("test interrupt");
248      }
249
250      switch (state) {
251        case STATE_1:
252          setNextState(State.STATE_2);
253          break;
254        case STATE_2:
255          setNextState(State.STATE_3);
256          break;
257        case STATE_3:
258          if (abortOnFinalStep) {
259            setFailure("test", new IOException("Requested abort on final step"));
260          }
261          return Flow.NO_MORE_STATE;
262        default:
263          throw new UnsupportedOperationException();
264      }
265      return Flow.HAS_MORE_STATE;
266    }
267
268    @Override
269    protected void rollbackState(TestProcEnv env, final State state)
270        throws InterruptedException {
271      final long ts = env.nextTimestamp();
272      LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
273      executionInfo.add(new ExecutionInfo(ts, state, true));
274      Thread.sleep(150);
275
276      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
277        LOG.debug("THROW INTERRUPT");
278        throw new InterruptedException("test interrupt");
279      }
280
281      switch (state) {
282        case STATE_1:
283          break;
284        case STATE_2:
285          break;
286        case STATE_3:
287          break;
288        default:
289          throw new UnsupportedOperationException();
290      }
291    }
292
293    @Override
294    protected State getState(final int stateId) {
295      return State.values()[stateId];
296    }
297
298    @Override
299    protected int getStateId(final State state) {
300      return state.ordinal();
301    }
302
303    @Override
304    protected State getInitialState() {
305      return State.STATE_1;
306    }
307
308    @Override
309    protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
310      return true;
311    }
312
313    @Override
314    protected boolean abort(TestProcEnv env) {
315      aborted.set(true);
316      return true;
317    }
318  }
319
320  public static class TestYieldProcedure extends Procedure<TestProcEnv> {
321    private int step = 0;
322
323    public TestYieldProcedure() {
324    }
325
326    @Override
327    protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
328      LOG.info("execute step " + step);
329      if (step++ < 5) {
330        throw new ProcedureYieldException();
331      }
332      return null;
333    }
334
335    @Override
336    protected void rollback(TestProcEnv env) {
337    }
338
339    @Override
340    protected boolean abort(TestProcEnv env) {
341      return false;
342    }
343
344    @Override
345    protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
346      return true;
347    }
348
349    @Override
350    protected void serializeStateData(ProcedureStateSerializer serializer)
351        throws IOException {
352    }
353
354    @Override
355    protected void deserializeStateData(ProcedureStateSerializer serializer)
356        throws IOException {
357    }
358  }
359
360  private static class TestScheduler extends SimpleProcedureScheduler {
361    private int completionCalls;
362    private int addFrontCalls;
363    private int addBackCalls;
364    private int yieldCalls;
365    private int pollCalls;
366
367    public TestScheduler() {}
368
369    @Override
370    public void addFront(final Procedure proc) {
371      addFrontCalls++;
372      super.addFront(proc);
373    }
374
375    @Override
376    public void addBack(final Procedure proc) {
377      addBackCalls++;
378      super.addBack(proc);
379    }
380
381    @Override
382    public void yield(final Procedure proc) {
383      yieldCalls++;
384      super.yield(proc);
385    }
386
387    @Override
388    public Procedure poll() {
389      pollCalls++;
390      return super.poll();
391    }
392
393    @Override
394    public Procedure poll(long timeout, TimeUnit unit) {
395      pollCalls++;
396      return super.poll(timeout, unit);
397    }
398
399    @Override
400    public void completionCleanup(Procedure proc) {
401      completionCalls++;
402    }
403  }
404}