001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.hadoop.hbase.util.Threads;
037import org.junit.jupiter.api.AfterEach;
038import org.junit.jupiter.api.BeforeEach;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.Test;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
045
046@Tag(MasterTests.TAG)
047@Tag(MediumTests.TAG)
048public class TestProcedureRecovery {
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class);
051
052  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
053
054  private static TestProcEnv procEnv;
055  private static ProcedureExecutor<TestProcEnv> procExecutor;
056  private static ProcedureStore procStore;
057  private static int procSleepInterval;
058
059  private HBaseCommonTestingUtil htu;
060  private FileSystem fs;
061  private Path testDir;
062  private Path logDir;
063
064  @BeforeEach
065  public void setUp() throws IOException {
066    htu = new HBaseCommonTestingUtil();
067    testDir = htu.getDataTestDir();
068    fs = testDir.getFileSystem(htu.getConfiguration());
069    assertTrue(testDir.depth() > 1);
070
071    logDir = new Path(testDir, "proc-logs");
072    procEnv = new TestProcEnv();
073    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
074    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
075    procExecutor.testing = new ProcedureExecutor.Testing();
076    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
077    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
078    procSleepInterval = 0;
079  }
080
081  @AfterEach
082  public void tearDown() throws IOException {
083    procExecutor.stop();
084    procStore.stop(false);
085    fs.delete(logDir, true);
086  }
087
088  private void restart() throws Exception {
089    dumpLogDirState();
090    ProcedureTestingUtility.restart(procExecutor);
091    dumpLogDirState();
092  }
093
094  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
095    private int step = 0;
096
097    public TestSingleStepProcedure() {
098    }
099
100    @Override
101    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
102      env.waitOnLatch();
103      LOG.debug("execute procedure " + this + " step=" + step);
104      step++;
105      setResult(Bytes.toBytes(step));
106      return null;
107    }
108
109    @Override
110    protected void rollback(TestProcEnv env) {
111    }
112
113    @Override
114    protected boolean abort(TestProcEnv env) {
115      return true;
116    }
117  }
118
119  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
120    private AtomicBoolean abort = new AtomicBoolean(false);
121    private int step = 0;
122
123    @Override
124    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
125      env.waitOnLatch();
126      LOG.debug("execute procedure " + this + " step=" + step);
127      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
128      ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor);
129      step++;
130      Threads.sleepWithoutInterrupt(procSleepInterval);
131      if (isAborted()) {
132        setFailure(new RemoteProcedureException(getClass().getName(), new ProcedureAbortedException(
133          "got an abort at " + getClass().getName() + " step=" + step)));
134        return null;
135      }
136      return null;
137    }
138
139    @Override
140    protected void rollback(TestProcEnv env) {
141      LOG.debug("rollback procedure " + this + " step=" + step);
142      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
143      ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor);
144      step++;
145    }
146
147    @Override
148    protected boolean abort(TestProcEnv env) {
149      abort.set(true);
150      return true;
151    }
152
153    private boolean isAborted() {
154      boolean aborted = abort.get();
155      BaseTestStepProcedure proc = this;
156      while (proc.hasParent() && !aborted) {
157        proc = (BaseTestStepProcedure) procExecutor.getProcedure(proc.getParentProcId());
158        aborted = proc.isAborted();
159      }
160      return aborted;
161    }
162  }
163
164  public static class TestMultiStepProcedure extends BaseTestStepProcedure {
165    public TestMultiStepProcedure() {
166    }
167
168    @Override
169    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
170      super.execute(env);
171      return isFailed() ? null : new Procedure[] { new Step1Procedure() };
172    }
173
174    public static class Step1Procedure extends BaseTestStepProcedure {
175      public Step1Procedure() {
176      }
177
178      @Override
179      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
180        super.execute(env);
181        return isFailed() ? null : new Procedure[] { new Step2Procedure() };
182      }
183    }
184
185    public static class Step2Procedure extends BaseTestStepProcedure {
186      public Step2Procedure() {
187      }
188    }
189  }
190
191  @Test
192  public void testNoopLoad() throws Exception {
193    restart();
194  }
195
196  @Test
197  public void testSingleStepProcRecovery() throws Exception {
198    Procedure proc = new TestSingleStepProcedure();
199    procExecutor.testing.killBeforeStoreUpdate = true;
200    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
201    assertFalse(procExecutor.isRunning());
202    procExecutor.testing.killBeforeStoreUpdate = false;
203
204    // Restart and verify that the procedures restart
205    long restartTs = EnvironmentEdgeManager.currentTime();
206    restart();
207    waitProcedure(procId);
208    Procedure<?> result = procExecutor.getResult(procId);
209    assertTrue(result.getLastUpdate() > restartTs);
210    ProcedureTestingUtility.assertProcNotFailed(result);
211    assertEquals(1, Bytes.toInt(result.getResult()));
212    long resultTs = result.getLastUpdate();
213
214    // Verify that after another restart the result is still there
215    restart();
216    result = procExecutor.getResult(procId);
217    ProcedureTestingUtility.assertProcNotFailed(result);
218    assertEquals(resultTs, result.getLastUpdate());
219    assertEquals(1, Bytes.toInt(result.getResult()));
220  }
221
222  @Test
223  public void testMultiStepProcRecovery() throws Exception {
224    // Step 0 - kill
225    Procedure proc = new TestMultiStepProcedure();
226    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
227    assertFalse(procExecutor.isRunning());
228
229    // Step 0 exec && Step 1 - kill
230    restart();
231    waitProcedure(procId);
232    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
233    assertFalse(procExecutor.isRunning());
234
235    // Step 1 exec && step 2 - kill
236    restart();
237    waitProcedure(procId);
238    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
239    assertFalse(procExecutor.isRunning());
240
241    // Step 2 exec
242    restart();
243    waitProcedure(procId);
244    assertTrue(procExecutor.isRunning());
245
246    // The procedure is completed
247    Procedure<?> result = procExecutor.getResult(procId);
248    ProcedureTestingUtility.assertProcNotFailed(result);
249  }
250
251  @Test
252  public void testMultiStepRollbackRecovery() throws Exception {
253    // Step 0 - kill
254    Procedure proc = new TestMultiStepProcedure();
255    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
256    assertFalse(procExecutor.isRunning());
257
258    // Step 0 exec && Step 1 - kill
259    restart();
260    waitProcedure(procId);
261    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
262    assertFalse(procExecutor.isRunning());
263
264    // Step 1 exec && step 2 - kill
265    restart();
266    waitProcedure(procId);
267    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
268    assertFalse(procExecutor.isRunning());
269
270    // Step 2 exec - rollback - kill
271    procSleepInterval = 2500;
272    restart();
273    assertTrue(procExecutor.abort(procId));
274    waitProcedure(procId);
275    assertFalse(procExecutor.isRunning());
276
277    // rollback - kill
278    restart();
279    waitProcedure(procId);
280    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
281    assertFalse(procExecutor.isRunning());
282
283    // rollback - complete
284    restart();
285    waitProcedure(procId);
286    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
287    assertFalse(procExecutor.isRunning());
288
289    // Restart the executor and get the result
290    restart();
291    waitProcedure(procId);
292
293    // The procedure is completed
294    Procedure<?> result = procExecutor.getResult(procId);
295    ProcedureTestingUtility.assertIsAbortException(result);
296  }
297
298  public static class TestStateMachineProcedure
299    extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
300    enum State {
301      STATE_1,
302      STATE_2,
303      STATE_3,
304      DONE
305    }
306
307    public TestStateMachineProcedure() {
308    }
309
310    public TestStateMachineProcedure(final boolean testSubmitChildProc) {
311      this.submitChildProc = testSubmitChildProc;
312    }
313
314    private AtomicBoolean aborted = new AtomicBoolean(false);
315    private int iResult = 0;
316    private boolean submitChildProc = false;
317
318    @Override
319    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
320      switch (state) {
321        case STATE_1:
322          LOG.info("execute step 1 " + this);
323          setNextState(State.STATE_2);
324          iResult += 3;
325          break;
326        case STATE_2:
327          LOG.info("execute step 2 " + this);
328          if (submitChildProc) {
329            addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure());
330            setNextState(State.DONE);
331          } else {
332            setNextState(State.STATE_3);
333          }
334          iResult += 5;
335          break;
336        case STATE_3:
337          LOG.info("execute step 3 " + this);
338          Threads.sleepWithoutInterrupt(procSleepInterval);
339          if (aborted.get()) {
340            LOG.info("aborted step 3 " + this);
341            setAbortFailure("test", "aborted");
342            break;
343          }
344          setNextState(State.DONE);
345          iResult += 7;
346          break;
347        case DONE:
348          if (submitChildProc) {
349            addChildProcedure(new TestStateMachineProcedure());
350          }
351          iResult += 11;
352          setResult(Bytes.toBytes(iResult));
353          return Flow.NO_MORE_STATE;
354        default:
355          throw new UnsupportedOperationException();
356      }
357      return Flow.HAS_MORE_STATE;
358    }
359
360    @Override
361    protected boolean isRollbackSupported(State state) {
362      return true;
363    }
364
365    @Override
366    protected void rollbackState(TestProcEnv env, final State state) {
367      switch (state) {
368        case STATE_1:
369          LOG.info("rollback step 1 " + this);
370          break;
371        case STATE_2:
372          LOG.info("rollback step 2 " + this);
373          break;
374        case STATE_3:
375          LOG.info("rollback step 3 " + this);
376          break;
377        default:
378          throw new UnsupportedOperationException();
379      }
380    }
381
382    @Override
383    protected State getState(final int stateId) {
384      return State.values()[stateId];
385    }
386
387    @Override
388    protected int getStateId(final State state) {
389      return state.ordinal();
390    }
391
392    @Override
393    protected State getInitialState() {
394      return State.STATE_1;
395    }
396
397    @Override
398    protected boolean abort(TestProcEnv env) {
399      aborted.set(true);
400      return true;
401    }
402
403    @Override
404    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
405      super.serializeStateData(serializer);
406      Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult);
407      serializer.serialize(builder.build());
408    }
409
410    @Override
411    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
412      super.deserializeStateData(serializer);
413      Int32Value value = serializer.deserialize(Int32Value.class);
414      iResult = value.getValue();
415    }
416  }
417
418  @Test
419  public void testStateMachineMultipleLevel() throws Exception {
420    long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
421    // Wait the completion
422    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
423    Procedure<?> result = procExecutor.getResult(procId);
424    ProcedureTestingUtility.assertProcNotFailed(result);
425    assertEquals(19, Bytes.toInt(result.getResult()));
426    assertEquals(4, procExecutor.getLastProcId());
427  }
428
429  @Test
430  public void testStateMachineRecovery() throws Exception {
431    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
432    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true);
433
434    // Step 1 - kill
435    Procedure proc = new TestStateMachineProcedure();
436    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
437    assertFalse(procExecutor.isRunning());
438
439    // Step 1 exec && Step 2 - kill
440    restart();
441    waitProcedure(procId);
442    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
443    assertFalse(procExecutor.isRunning());
444
445    // Step 2 exec && step 3 - kill
446    restart();
447    waitProcedure(procId);
448    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
449    assertFalse(procExecutor.isRunning());
450
451    // Step 3 exec
452    restart();
453    waitProcedure(procId);
454    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
455    assertFalse(procExecutor.isRunning());
456
457    restart();
458    waitProcedure(procId);
459    assertTrue(procExecutor.isRunning());
460
461    // The procedure is completed
462    Procedure<?> result = procExecutor.getResult(procId);
463    ProcedureTestingUtility.assertProcNotFailed(result);
464    assertEquals(26, Bytes.toInt(result.getResult()));
465  }
466
467  @Test
468  public void testStateMachineRollbackRecovery() throws Exception {
469    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
470    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true);
471
472    // Step 1 - kill
473    Procedure proc = new TestStateMachineProcedure();
474    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
475    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
476    assertFalse(procExecutor.isRunning());
477
478    // Step 1 exec && Step 2 - kill
479    restart();
480    waitProcedure(procId);
481    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
482    assertFalse(procExecutor.isRunning());
483
484    // Step 2 exec && step 3 - kill
485    restart();
486    waitProcedure(procId);
487    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
488    assertFalse(procExecutor.isRunning());
489
490    // Step 3 exec - rollback step 3 - kill
491    procSleepInterval = 2500;
492    restart();
493    assertTrue(procExecutor.abort(procId));
494    waitProcedure(procId);
495    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
496    assertFalse(procExecutor.isRunning());
497
498    // Rollback step 3 - rollback step 2 - kill
499    restart();
500    waitProcedure(procId);
501    assertFalse(procExecutor.isRunning());
502    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
503
504    // Rollback step 2 - step 1 - kill
505    restart();
506    waitProcedure(procId);
507    assertFalse(procExecutor.isRunning());
508    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
509
510    // Rollback step 1 - complete
511    restart();
512    waitProcedure(procId);
513    assertTrue(procExecutor.isRunning());
514
515    // The procedure is completed
516    Procedure<?> result = procExecutor.getResult(procId);
517    ProcedureTestingUtility.assertIsAbortException(result);
518  }
519
520  private void waitProcedure(final long procId) {
521    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
522    dumpLogDirState();
523  }
524
525  private void dumpLogDirState() {
526    try {
527      FileStatus[] files = fs.listStatus(logDir);
528      if (files != null && files.length > 0) {
529        for (FileStatus file : files) {
530          assertTrue(file.isFile(), file.toString());
531          LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
532        }
533      } else {
534        LOG.debug("no files under: " + logDir);
535      }
536    } catch (IOException e) {
537      LOG.warn("Unable to dump " + logDir, e);
538    }
539  }
540
541  private static class TestProcEnv {
542    private CountDownLatch latch = null;
543
544    /**
545     * set/unset a latch. every procedure execute() step will wait on the latch if any.
546     */
547    public void setWaitLatch(CountDownLatch latch) {
548      this.latch = latch;
549    }
550
551    public void waitOnLatch() throws InterruptedException {
552      if (latch != null) {
553        latch.await();
554      }
555    }
556  }
557}