001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Category({MasterTests.class, SmallTests.class})
044public class TestYieldProcedures {
045
046  @ClassRule
047  public static final HBaseClassTestRule CLASS_RULE =
048      HBaseClassTestRule.forClass(TestYieldProcedures.class);
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class);
051
052  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
053  private static final Procedure NULL_PROC = null;
054
055  private ProcedureExecutor<TestProcEnv> procExecutor;
056  private TestScheduler procRunnables;
057  private ProcedureStore procStore;
058
059  private HBaseCommonTestingUtility htu;
060  private FileSystem fs;
061  private Path testDir;
062  private Path logDir;
063
064  @Before
065  public void setUp() throws IOException {
066    htu = new HBaseCommonTestingUtility();
067    testDir = htu.getDataTestDir();
068    fs = testDir.getFileSystem(htu.getConfiguration());
069    assertTrue(testDir.depth() > 1);
070
071    logDir = new Path(testDir, "proc-logs");
072    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
073    procRunnables = new TestScheduler();
074    procExecutor =
075      new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
076    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
077    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false,
078            true);
079  }
080
081  @After
082  public void tearDown() throws IOException {
083    procExecutor.stop();
084    procStore.stop(false);
085    fs.delete(logDir, true);
086  }
087
088  @Test
089  public void testYieldEachExecutionStep() throws Exception {
090    final int NUM_STATES = 3;
091
092    TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
093    for (int i = 0; i < procs.length; ++i) {
094      procs[i] = new TestStateMachineProcedure(true, false);
095      procExecutor.submitProcedure(procs[i]);
096    }
097    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
098
099    for (int i = 0; i < procs.length; ++i) {
100      assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
101
102      // verify execution
103      int index = 0;
104      for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
105        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
106        assertEquals(false, info.isRollback());
107        assertEquals(execStep, info.getStep().ordinal());
108      }
109
110      // verify rollback
111      for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
112        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
113        assertEquals(true, info.isRollback());
114        assertEquals(execStep, info.getStep().ordinal());
115      }
116    }
117
118    // check runnable queue stats
119    assertEquals(0, procRunnables.size());
120    assertEquals(0, procRunnables.addFrontCalls);
121    assertEquals(15, procRunnables.addBackCalls);
122    assertEquals(12, procRunnables.yieldCalls);
123    assertEquals(16, procRunnables.pollCalls);
124    assertEquals(3, procRunnables.completionCalls);
125  }
126
127  @Test
128  public void testYieldOnInterrupt() throws Exception {
129    final int NUM_STATES = 3;
130    int count = 0;
131
132    TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
133    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
134
135    // test execute (we execute steps twice, one has the IE the other completes)
136    assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
137    for (int i = 0; i < NUM_STATES; ++i) {
138      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
139      assertEquals(false, info.isRollback());
140      assertEquals(i, info.getStep().ordinal());
141
142      info = proc.getExecutionInfo().get(count++);
143      assertEquals(false, info.isRollback());
144      assertEquals(i, info.getStep().ordinal());
145    }
146
147    // test rollback (we execute steps twice, rollback counts both IE and completed)
148    for (int i = NUM_STATES - 1; i >= 0; --i) {
149      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
150      assertEquals(true, info.isRollback());
151      assertEquals(i, info.getStep().ordinal());
152    }
153
154    for (int i = NUM_STATES - 1; i >= 0; --i) {
155      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
156      assertEquals(true, info.isRollback());
157      assertEquals(0, info.getStep().ordinal());
158    }
159
160    // check runnable queue stats
161    assertEquals(0, procRunnables.size());
162    assertEquals(0, procRunnables.addFrontCalls);
163    assertEquals(11, procRunnables.addBackCalls);
164    assertEquals(10, procRunnables.yieldCalls);
165    assertEquals(12, procRunnables.pollCalls);
166    assertEquals(1, procRunnables.completionCalls);
167  }
168
169  @Test
170  public void testYieldException() {
171    TestYieldProcedure proc = new TestYieldProcedure();
172    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
173    assertEquals(6, proc.step);
174
175    // check runnable queue stats
176    assertEquals(0, procRunnables.size());
177    assertEquals(0, procRunnables.addFrontCalls);
178    assertEquals(6, procRunnables.addBackCalls);
179    assertEquals(5, procRunnables.yieldCalls);
180    assertEquals(7, procRunnables.pollCalls);
181    assertEquals(1, procRunnables.completionCalls);
182  }
183
184  private static class TestProcEnv {
185    public final AtomicLong timestamp = new AtomicLong(0);
186
187    public long nextTimestamp() {
188      return timestamp.incrementAndGet();
189    }
190  }
191
192  public static class TestStateMachineProcedure
193      extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
194    enum State { STATE_1, STATE_2, STATE_3 }
195
196    public static class ExecutionInfo {
197      private final boolean rollback;
198      private final long timestamp;
199      private final State step;
200
201      public ExecutionInfo(long timestamp, State step, boolean isRollback) {
202        this.timestamp = timestamp;
203        this.step = step;
204        this.rollback = isRollback;
205      }
206
207      public State getStep() { return step; }
208      public long getTimestamp() { return timestamp; }
209      public boolean isRollback() { return rollback; }
210    }
211
212    private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>();
213    private final AtomicBoolean aborted = new AtomicBoolean(false);
214    private final boolean throwInterruptOnceOnEachStep;
215    private final boolean abortOnFinalStep;
216
217    public TestStateMachineProcedure() {
218      this(false, false);
219    }
220
221    public TestStateMachineProcedure(boolean abortOnFinalStep,
222        boolean throwInterruptOnceOnEachStep) {
223      this.abortOnFinalStep = abortOnFinalStep;
224      this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
225    }
226
227    public ArrayList<ExecutionInfo> getExecutionInfo() {
228      return executionInfo;
229    }
230
231    @Override
232    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
233        throws InterruptedException {
234      final long ts = env.nextTimestamp();
235      LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
236      executionInfo.add(new ExecutionInfo(ts, state, false));
237      Thread.sleep(150);
238
239      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
240        LOG.debug("THROW INTERRUPT");
241        throw new InterruptedException("test interrupt");
242      }
243
244      switch (state) {
245        case STATE_1:
246          setNextState(State.STATE_2);
247          break;
248        case STATE_2:
249          setNextState(State.STATE_3);
250          break;
251        case STATE_3:
252          if (abortOnFinalStep) {
253            setFailure("test", new IOException("Requested abort on final step"));
254          }
255          return Flow.NO_MORE_STATE;
256        default:
257          throw new UnsupportedOperationException();
258      }
259      return Flow.HAS_MORE_STATE;
260    }
261
262    @Override
263    protected void rollbackState(TestProcEnv env, final State state)
264        throws InterruptedException {
265      final long ts = env.nextTimestamp();
266      LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
267      executionInfo.add(new ExecutionInfo(ts, state, true));
268      Thread.sleep(150);
269
270      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
271        LOG.debug("THROW INTERRUPT");
272        throw new InterruptedException("test interrupt");
273      }
274
275      switch (state) {
276        case STATE_1:
277          break;
278        case STATE_2:
279          break;
280        case STATE_3:
281          break;
282        default:
283          throw new UnsupportedOperationException();
284      }
285    }
286
287    @Override
288    protected State getState(final int stateId) {
289      return State.values()[stateId];
290    }
291
292    @Override
293    protected int getStateId(final State state) {
294      return state.ordinal();
295    }
296
297    @Override
298    protected State getInitialState() {
299      return State.STATE_1;
300    }
301
302    @Override
303    protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
304      return true;
305    }
306
307    @Override
308    protected boolean abort(TestProcEnv env) {
309      aborted.set(true);
310      return true;
311    }
312  }
313
314  public static class TestYieldProcedure extends Procedure<TestProcEnv> {
315    private int step = 0;
316
317    public TestYieldProcedure() {
318    }
319
320    @Override
321    protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
322      LOG.info("execute step " + step);
323      if (step++ < 5) {
324        throw new ProcedureYieldException();
325      }
326      return null;
327    }
328
329    @Override
330    protected void rollback(TestProcEnv env) {
331    }
332
333    @Override
334    protected boolean abort(TestProcEnv env) {
335      return false;
336    }
337
338    @Override
339    protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
340      return true;
341    }
342
343    @Override
344    protected void serializeStateData(ProcedureStateSerializer serializer)
345        throws IOException {
346    }
347
348    @Override
349    protected void deserializeStateData(ProcedureStateSerializer serializer)
350        throws IOException {
351    }
352  }
353
354  private static class TestScheduler extends SimpleProcedureScheduler {
355    private int completionCalls;
356    private int addFrontCalls;
357    private int addBackCalls;
358    private int yieldCalls;
359    private int pollCalls;
360
361    public TestScheduler() {}
362
363    @Override
364    public void addFront(final Procedure proc) {
365      addFrontCalls++;
366      super.addFront(proc);
367    }
368
369    @Override
370    public void addBack(final Procedure proc) {
371      addBackCalls++;
372      super.addBack(proc);
373    }
374
375    @Override
376    public void yield(final Procedure proc) {
377      yieldCalls++;
378      super.yield(proc);
379    }
380
381    @Override
382    public Procedure poll() {
383      pollCalls++;
384      return super.poll();
385    }
386
387    @Override
388    public Procedure poll(long timeout, TimeUnit unit) {
389      pollCalls++;
390      return super.poll(timeout, unit);
391    }
392
393    @Override
394    public Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit) {
395      pollCalls++;
396      return super.poll(onlyUrgent, timeout, unit);
397    }
398
399    @Override
400    public void completionCleanup(Procedure proc) {
401      completionCalls++;
402    }
403  }
404}