001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043@Category({ MasterTests.class, SmallTests.class })
044public class TestYieldProcedures {
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047    HBaseClassTestRule.forClass(TestYieldProcedures.class);
048
049  private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class);
050
051  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
052  private static final Procedure NULL_PROC = null;
053
054  private ProcedureExecutor<TestProcEnv> procExecutor;
055  private TestScheduler procRunnables;
056  private ProcedureStore procStore;
057
058  private HBaseCommonTestingUtility htu;
059  private FileSystem fs;
060  private Path testDir;
061  private Path logDir;
062
063  @Before
064  public void setUp() throws IOException {
065    htu = new HBaseCommonTestingUtility();
066    testDir = htu.getDataTestDir();
067    fs = testDir.getFileSystem(htu.getConfiguration());
068    assertTrue(testDir.depth() > 1);
069
070    logDir = new Path(testDir, "proc-logs");
071    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
072    procRunnables = new TestScheduler();
073    procExecutor =
074      new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables);
075    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
076    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
077  }
078
079  @After
080  public void tearDown() throws IOException {
081    procExecutor.stop();
082    procStore.stop(false);
083    fs.delete(logDir, true);
084  }
085
086  @Test
087  public void testYieldEachExecutionStep() throws Exception {
088    final int NUM_STATES = 3;
089
090    TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3];
091    for (int i = 0; i < procs.length; ++i) {
092      procs[i] = new TestStateMachineProcedure(true, false);
093      procExecutor.submitProcedure(procs[i]);
094    }
095    ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
096
097    for (int i = 0; i < procs.length; ++i) {
098      assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size());
099
100      // verify execution
101      int index = 0;
102      for (int execStep = 0; execStep < NUM_STATES; ++execStep) {
103        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
104        assertEquals(false, info.isRollback());
105        assertEquals(execStep, info.getStep().ordinal());
106      }
107
108      // verify rollback
109      for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) {
110        TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++);
111        assertEquals(true, info.isRollback());
112        assertEquals(execStep, info.getStep().ordinal());
113      }
114    }
115
116    // check runnable queue stats
117    assertEquals(0, procRunnables.size());
118    assertEquals(0, procRunnables.addFrontCalls);
119    assertEquals(15, procRunnables.addBackCalls);
120    assertEquals(12, procRunnables.yieldCalls);
121    assertEquals(16, procRunnables.pollCalls);
122    assertEquals(3, procRunnables.completionCalls);
123  }
124
125  @Test
126  public void testYieldOnInterrupt() throws Exception {
127    final int NUM_STATES = 3;
128    int count = 0;
129
130    TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true);
131    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
132
133    // test execute (we execute steps twice, one has the IE the other completes)
134    assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size());
135    for (int i = 0; i < NUM_STATES; ++i) {
136      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
137      assertEquals(false, info.isRollback());
138      assertEquals(i, info.getStep().ordinal());
139
140      info = proc.getExecutionInfo().get(count++);
141      assertEquals(false, info.isRollback());
142      assertEquals(i, info.getStep().ordinal());
143    }
144
145    // test rollback (we execute steps twice, rollback counts both IE and completed)
146    for (int i = NUM_STATES - 1; i >= 0; --i) {
147      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
148      assertEquals(true, info.isRollback());
149      assertEquals(i, info.getStep().ordinal());
150    }
151
152    for (int i = NUM_STATES - 1; i >= 0; --i) {
153      TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++);
154      assertEquals(true, info.isRollback());
155      assertEquals(0, info.getStep().ordinal());
156    }
157
158    // check runnable queue stats
159    assertEquals(0, procRunnables.size());
160    assertEquals(0, procRunnables.addFrontCalls);
161    assertEquals(11, procRunnables.addBackCalls);
162    assertEquals(10, procRunnables.yieldCalls);
163    assertEquals(12, procRunnables.pollCalls);
164    assertEquals(1, procRunnables.completionCalls);
165  }
166
167  @Test
168  public void testYieldException() {
169    TestYieldProcedure proc = new TestYieldProcedure();
170    ProcedureTestingUtility.submitAndWait(procExecutor, proc);
171    assertEquals(6, proc.step);
172
173    // check runnable queue stats
174    assertEquals(0, procRunnables.size());
175    assertEquals(0, procRunnables.addFrontCalls);
176    assertEquals(6, procRunnables.addBackCalls);
177    assertEquals(5, procRunnables.yieldCalls);
178    assertEquals(7, procRunnables.pollCalls);
179    assertEquals(1, procRunnables.completionCalls);
180  }
181
182  private static class TestProcEnv {
183    public final AtomicLong timestamp = new AtomicLong(0);
184
185    public long nextTimestamp() {
186      return timestamp.incrementAndGet();
187    }
188  }
189
190  public static class TestStateMachineProcedure
191    extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
192    enum State {
193      STATE_1,
194      STATE_2,
195      STATE_3
196    }
197
198    public static class ExecutionInfo {
199      private final boolean rollback;
200      private final long timestamp;
201      private final State step;
202
203      public ExecutionInfo(long timestamp, State step, boolean isRollback) {
204        this.timestamp = timestamp;
205        this.step = step;
206        this.rollback = isRollback;
207      }
208
209      public State getStep() {
210        return step;
211      }
212
213      public long getTimestamp() {
214        return timestamp;
215      }
216
217      public boolean isRollback() {
218        return rollback;
219      }
220    }
221
222    private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>();
223    private final AtomicBoolean aborted = new AtomicBoolean(false);
224    private final boolean throwInterruptOnceOnEachStep;
225    private final boolean abortOnFinalStep;
226
227    public TestStateMachineProcedure() {
228      this(false, false);
229    }
230
231    public TestStateMachineProcedure(boolean abortOnFinalStep,
232      boolean throwInterruptOnceOnEachStep) {
233      this.abortOnFinalStep = abortOnFinalStep;
234      this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep;
235    }
236
237    public ArrayList<ExecutionInfo> getExecutionInfo() {
238      return executionInfo;
239    }
240
241    @Override
242    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state)
243      throws InterruptedException {
244      final long ts = env.nextTimestamp();
245      LOG.info(getProcId() + " execute step " + state + " ts=" + ts);
246      executionInfo.add(new ExecutionInfo(ts, state, false));
247      Thread.sleep(150);
248
249      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
250        LOG.debug("THROW INTERRUPT");
251        throw new InterruptedException("test interrupt");
252      }
253
254      switch (state) {
255        case STATE_1:
256          setNextState(State.STATE_2);
257          break;
258        case STATE_2:
259          setNextState(State.STATE_3);
260          break;
261        case STATE_3:
262          if (abortOnFinalStep) {
263            setFailure("test", new IOException("Requested abort on final step"));
264          }
265          return Flow.NO_MORE_STATE;
266        default:
267          throw new UnsupportedOperationException();
268      }
269      return Flow.HAS_MORE_STATE;
270    }
271
272    @Override
273    protected void rollbackState(TestProcEnv env, final State state) throws InterruptedException {
274      final long ts = env.nextTimestamp();
275      LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts);
276      executionInfo.add(new ExecutionInfo(ts, state, true));
277      Thread.sleep(150);
278
279      if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) {
280        LOG.debug("THROW INTERRUPT");
281        throw new InterruptedException("test interrupt");
282      }
283
284      switch (state) {
285        case STATE_1:
286          break;
287        case STATE_2:
288          break;
289        case STATE_3:
290          break;
291        default:
292          throw new UnsupportedOperationException();
293      }
294    }
295
296    @Override
297    protected State getState(final int stateId) {
298      return State.values()[stateId];
299    }
300
301    @Override
302    protected int getStateId(final State state) {
303      return state.ordinal();
304    }
305
306    @Override
307    protected State getInitialState() {
308      return State.STATE_1;
309    }
310
311    @Override
312    protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) {
313      return true;
314    }
315
316    @Override
317    protected boolean abort(TestProcEnv env) {
318      aborted.set(true);
319      return true;
320    }
321  }
322
323  public static class TestYieldProcedure extends Procedure<TestProcEnv> {
324    private int step = 0;
325
326    public TestYieldProcedure() {
327    }
328
329    @Override
330    protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException {
331      LOG.info("execute step " + step);
332      if (step++ < 5) {
333        throw new ProcedureYieldException();
334      }
335      return null;
336    }
337
338    @Override
339    protected void rollback(TestProcEnv env) {
340    }
341
342    @Override
343    protected boolean abort(TestProcEnv env) {
344      return false;
345    }
346
347    @Override
348    protected boolean isYieldAfterExecutionStep(final TestProcEnv env) {
349      return true;
350    }
351
352    @Override
353    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
354    }
355
356    @Override
357    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
358    }
359  }
360
361  private static class TestScheduler extends SimpleProcedureScheduler {
362    private int completionCalls;
363    private int addFrontCalls;
364    private int addBackCalls;
365    private int yieldCalls;
366    private int pollCalls;
367
368    public TestScheduler() {
369    }
370
371    @Override
372    public void addFront(final Procedure proc) {
373      addFrontCalls++;
374      super.addFront(proc);
375    }
376
377    @Override
378    public void addBack(final Procedure proc) {
379      addBackCalls++;
380      super.addBack(proc);
381    }
382
383    @Override
384    public void yield(final Procedure proc) {
385      yieldCalls++;
386      super.yield(proc);
387    }
388
389    @Override
390    public Procedure poll() {
391      pollCalls++;
392      return super.poll();
393    }
394
395    @Override
396    public Procedure poll(long timeout, TimeUnit unit) {
397      pollCalls++;
398      return super.poll(timeout, unit);
399    }
400
401    @Override
402    public void completionCleanup(Procedure proc) {
403      completionCalls++;
404    }
405  }
406}