001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
032import org.apache.hadoop.hbase.procedure2.TestProcedureRecovery.TestStateMachineProcedure.State;
033import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
034import org.apache.hadoop.hbase.testclassification.MasterTests;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.junit.After;
040import org.junit.Before;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
048
049@Category({ MasterTests.class, MediumTests.class })
050public class TestProcedureRecovery {
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053    HBaseClassTestRule.forClass(TestProcedureRecovery.class);
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class);
056
057  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
058
059  private static TestProcEnv procEnv;
060  private static ProcedureExecutor<TestProcEnv> procExecutor;
061  private static ProcedureStore procStore;
062  private static int procSleepInterval;
063
064  private HBaseCommonTestingUtil htu;
065  private FileSystem fs;
066  private Path testDir;
067  private Path logDir;
068
069  @Before
070  public void setUp() throws IOException {
071    htu = new HBaseCommonTestingUtil();
072    testDir = htu.getDataTestDir();
073    fs = testDir.getFileSystem(htu.getConfiguration());
074    assertTrue(testDir.depth() > 1);
075
076    logDir = new Path(testDir, "proc-logs");
077    procEnv = new TestProcEnv();
078    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
079    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
080    procExecutor.testing = new ProcedureExecutor.Testing();
081    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
082    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
083    procSleepInterval = 0;
084  }
085
086  @After
087  public void tearDown() throws IOException {
088    procExecutor.stop();
089    procStore.stop(false);
090    fs.delete(logDir, true);
091  }
092
093  private void restart() throws Exception {
094    dumpLogDirState();
095    ProcedureTestingUtility.restart(procExecutor);
096    dumpLogDirState();
097  }
098
099  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
100    private int step = 0;
101
102    public TestSingleStepProcedure() {
103    }
104
105    @Override
106    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
107      env.waitOnLatch();
108      LOG.debug("execute procedure " + this + " step=" + step);
109      step++;
110      setResult(Bytes.toBytes(step));
111      return null;
112    }
113
114    @Override
115    protected void rollback(TestProcEnv env) {
116    }
117
118    @Override
119    protected boolean abort(TestProcEnv env) {
120      return true;
121    }
122  }
123
124  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
125    private AtomicBoolean abort = new AtomicBoolean(false);
126    private int step = 0;
127
128    @Override
129    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
130      env.waitOnLatch();
131      LOG.debug("execute procedure " + this + " step=" + step);
132      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
133      ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor);
134      step++;
135      Threads.sleepWithoutInterrupt(procSleepInterval);
136      if (isAborted()) {
137        setFailure(new RemoteProcedureException(getClass().getName(), new ProcedureAbortedException(
138          "got an abort at " + getClass().getName() + " step=" + step)));
139        return null;
140      }
141      return null;
142    }
143
144    @Override
145    protected void rollback(TestProcEnv env) {
146      LOG.debug("rollback procedure " + this + " step=" + step);
147      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
148      ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor);
149      step++;
150    }
151
152    @Override
153    protected boolean abort(TestProcEnv env) {
154      abort.set(true);
155      return true;
156    }
157
158    private boolean isAborted() {
159      boolean aborted = abort.get();
160      BaseTestStepProcedure proc = this;
161      while (proc.hasParent() && !aborted) {
162        proc = (BaseTestStepProcedure) procExecutor.getProcedure(proc.getParentProcId());
163        aborted = proc.isAborted();
164      }
165      return aborted;
166    }
167  }
168
169  public static class TestMultiStepProcedure extends BaseTestStepProcedure {
170    public TestMultiStepProcedure() {
171    }
172
173    @Override
174    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
175      super.execute(env);
176      return isFailed() ? null : new Procedure[] { new Step1Procedure() };
177    }
178
179    public static class Step1Procedure extends BaseTestStepProcedure {
180      public Step1Procedure() {
181      }
182
183      @Override
184      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
185        super.execute(env);
186        return isFailed() ? null : new Procedure[] { new Step2Procedure() };
187      }
188    }
189
190    public static class Step2Procedure extends BaseTestStepProcedure {
191      public Step2Procedure() {
192      }
193    }
194  }
195
196  @Test
197  public void testNoopLoad() throws Exception {
198    restart();
199  }
200
201  @Test
202  public void testSingleStepProcRecovery() throws Exception {
203    Procedure proc = new TestSingleStepProcedure();
204    procExecutor.testing.killBeforeStoreUpdate = true;
205    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
206    assertFalse(procExecutor.isRunning());
207    procExecutor.testing.killBeforeStoreUpdate = false;
208
209    // Restart and verify that the procedures restart
210    long restartTs = EnvironmentEdgeManager.currentTime();
211    restart();
212    waitProcedure(procId);
213    Procedure<?> result = procExecutor.getResult(procId);
214    assertTrue(result.getLastUpdate() > restartTs);
215    ProcedureTestingUtility.assertProcNotFailed(result);
216    assertEquals(1, Bytes.toInt(result.getResult()));
217    long resultTs = result.getLastUpdate();
218
219    // Verify that after another restart the result is still there
220    restart();
221    result = procExecutor.getResult(procId);
222    ProcedureTestingUtility.assertProcNotFailed(result);
223    assertEquals(resultTs, result.getLastUpdate());
224    assertEquals(1, Bytes.toInt(result.getResult()));
225  }
226
227  @Test
228  public void testMultiStepProcRecovery() throws Exception {
229    // Step 0 - kill
230    Procedure proc = new TestMultiStepProcedure();
231    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
232    assertFalse(procExecutor.isRunning());
233
234    // Step 0 exec && Step 1 - kill
235    restart();
236    waitProcedure(procId);
237    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
238    assertFalse(procExecutor.isRunning());
239
240    // Step 1 exec && step 2 - kill
241    restart();
242    waitProcedure(procId);
243    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
244    assertFalse(procExecutor.isRunning());
245
246    // Step 2 exec
247    restart();
248    waitProcedure(procId);
249    assertTrue(procExecutor.isRunning());
250
251    // The procedure is completed
252    Procedure<?> result = procExecutor.getResult(procId);
253    ProcedureTestingUtility.assertProcNotFailed(result);
254  }
255
256  @Test
257  public void testMultiStepRollbackRecovery() throws Exception {
258    // Step 0 - kill
259    Procedure proc = new TestMultiStepProcedure();
260    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
261    assertFalse(procExecutor.isRunning());
262
263    // Step 0 exec && Step 1 - kill
264    restart();
265    waitProcedure(procId);
266    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
267    assertFalse(procExecutor.isRunning());
268
269    // Step 1 exec && step 2 - kill
270    restart();
271    waitProcedure(procId);
272    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
273    assertFalse(procExecutor.isRunning());
274
275    // Step 2 exec - rollback - kill
276    procSleepInterval = 2500;
277    restart();
278    assertTrue(procExecutor.abort(procId));
279    waitProcedure(procId);
280    assertFalse(procExecutor.isRunning());
281
282    // rollback - kill
283    restart();
284    waitProcedure(procId);
285    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
286    assertFalse(procExecutor.isRunning());
287
288    // rollback - complete
289    restart();
290    waitProcedure(procId);
291    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
292    assertFalse(procExecutor.isRunning());
293
294    // Restart the executor and get the result
295    restart();
296    waitProcedure(procId);
297
298    // The procedure is completed
299    Procedure<?> result = procExecutor.getResult(procId);
300    ProcedureTestingUtility.assertIsAbortException(result);
301  }
302
303  public static class TestStateMachineProcedure
304    extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
305    enum State {
306      STATE_1,
307      STATE_2,
308      STATE_3,
309      DONE
310    }
311
312    public TestStateMachineProcedure() {
313    }
314
315    public TestStateMachineProcedure(final boolean testSubmitChildProc) {
316      this.submitChildProc = testSubmitChildProc;
317    }
318
319    private AtomicBoolean aborted = new AtomicBoolean(false);
320    private int iResult = 0;
321    private boolean submitChildProc = false;
322
323    @Override
324    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
325      switch (state) {
326        case STATE_1:
327          LOG.info("execute step 1 " + this);
328          setNextState(State.STATE_2);
329          iResult += 3;
330          break;
331        case STATE_2:
332          LOG.info("execute step 2 " + this);
333          if (submitChildProc) {
334            addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure());
335            setNextState(State.DONE);
336          } else {
337            setNextState(State.STATE_3);
338          }
339          iResult += 5;
340          break;
341        case STATE_3:
342          LOG.info("execute step 3 " + this);
343          Threads.sleepWithoutInterrupt(procSleepInterval);
344          if (aborted.get()) {
345            LOG.info("aborted step 3 " + this);
346            setAbortFailure("test", "aborted");
347            break;
348          }
349          setNextState(State.DONE);
350          iResult += 7;
351          break;
352        case DONE:
353          if (submitChildProc) {
354            addChildProcedure(new TestStateMachineProcedure());
355          }
356          iResult += 11;
357          setResult(Bytes.toBytes(iResult));
358          return Flow.NO_MORE_STATE;
359        default:
360          throw new UnsupportedOperationException();
361      }
362      return Flow.HAS_MORE_STATE;
363    }
364
365    @Override
366    protected boolean isRollbackSupported(State state) {
367      return true;
368    }
369
370    @Override
371    protected void rollbackState(TestProcEnv env, final State state) {
372      switch (state) {
373        case STATE_1:
374          LOG.info("rollback step 1 " + this);
375          break;
376        case STATE_2:
377          LOG.info("rollback step 2 " + this);
378          break;
379        case STATE_3:
380          LOG.info("rollback step 3 " + this);
381          break;
382        default:
383          throw new UnsupportedOperationException();
384      }
385    }
386
387    @Override
388    protected State getState(final int stateId) {
389      return State.values()[stateId];
390    }
391
392    @Override
393    protected int getStateId(final State state) {
394      return state.ordinal();
395    }
396
397    @Override
398    protected State getInitialState() {
399      return State.STATE_1;
400    }
401
402    @Override
403    protected boolean abort(TestProcEnv env) {
404      aborted.set(true);
405      return true;
406    }
407
408    @Override
409    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
410      super.serializeStateData(serializer);
411      Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult);
412      serializer.serialize(builder.build());
413    }
414
415    @Override
416    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
417      super.deserializeStateData(serializer);
418      Int32Value value = serializer.deserialize(Int32Value.class);
419      iResult = value.getValue();
420    }
421  }
422
423  @Test
424  public void testStateMachineMultipleLevel() throws Exception {
425    long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
426    // Wait the completion
427    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
428    Procedure<?> result = procExecutor.getResult(procId);
429    ProcedureTestingUtility.assertProcNotFailed(result);
430    assertEquals(19, Bytes.toInt(result.getResult()));
431    assertEquals(4, procExecutor.getLastProcId());
432  }
433
434  @Test
435  public void testStateMachineRecovery() throws Exception {
436    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
437    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true);
438
439    // Step 1 - kill
440    Procedure proc = new TestStateMachineProcedure();
441    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
442    assertFalse(procExecutor.isRunning());
443
444    // Step 1 exec && Step 2 - kill
445    restart();
446    waitProcedure(procId);
447    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
448    assertFalse(procExecutor.isRunning());
449
450    // Step 2 exec && step 3 - kill
451    restart();
452    waitProcedure(procId);
453    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
454    assertFalse(procExecutor.isRunning());
455
456    // Step 3 exec
457    restart();
458    waitProcedure(procId);
459    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
460    assertFalse(procExecutor.isRunning());
461
462    restart();
463    waitProcedure(procId);
464    assertTrue(procExecutor.isRunning());
465
466    // The procedure is completed
467    Procedure<?> result = procExecutor.getResult(procId);
468    ProcedureTestingUtility.assertProcNotFailed(result);
469    assertEquals(26, Bytes.toInt(result.getResult()));
470  }
471
472  @Test
473  public void testStateMachineRollbackRecovery() throws Exception {
474    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
475    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true);
476
477    // Step 1 - kill
478    Procedure proc = new TestStateMachineProcedure();
479    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
480    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
481    assertFalse(procExecutor.isRunning());
482
483    // Step 1 exec && Step 2 - kill
484    restart();
485    waitProcedure(procId);
486    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
487    assertFalse(procExecutor.isRunning());
488
489    // Step 2 exec && step 3 - kill
490    restart();
491    waitProcedure(procId);
492    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
493    assertFalse(procExecutor.isRunning());
494
495    // Step 3 exec - rollback step 3 - kill
496    procSleepInterval = 2500;
497    restart();
498    assertTrue(procExecutor.abort(procId));
499    waitProcedure(procId);
500    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
501    assertFalse(procExecutor.isRunning());
502
503    // Rollback step 3 - rollback step 2 - kill
504    restart();
505    waitProcedure(procId);
506    assertFalse(procExecutor.isRunning());
507    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
508
509    // Rollback step 2 - step 1 - kill
510    restart();
511    waitProcedure(procId);
512    assertFalse(procExecutor.isRunning());
513    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
514
515    // Rollback step 1 - complete
516    restart();
517    waitProcedure(procId);
518    assertTrue(procExecutor.isRunning());
519
520    // The procedure is completed
521    Procedure<?> result = procExecutor.getResult(procId);
522    ProcedureTestingUtility.assertIsAbortException(result);
523  }
524
525  private void waitProcedure(final long procId) {
526    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
527    dumpLogDirState();
528  }
529
530  private void dumpLogDirState() {
531    try {
532      FileStatus[] files = fs.listStatus(logDir);
533      if (files != null && files.length > 0) {
534        for (FileStatus file : files) {
535          assertTrue(file.toString(), file.isFile());
536          LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
537        }
538      } else {
539        LOG.debug("no files under: " + logDir);
540      }
541    } catch (IOException e) {
542      LOG.warn("Unable to dump " + logDir, e);
543    }
544  }
545
546  private static class TestProcEnv {
547    private CountDownLatch latch = null;
548
549    /**
550     * set/unset a latch. every procedure execute() step will wait on the latch if any.
551     */
552    public void setWaitLatch(CountDownLatch latch) {
553      this.latch = latch;
554    }
555
556    public void waitOnLatch() throws InterruptedException {
557      if (latch != null) {
558        latch.await();
559      }
560    }
561  }
562}