001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
047
048@Category({ MasterTests.class, MediumTests.class })
049public class TestProcedureRecovery {
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestProcedureRecovery.class);
053
054  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class);
055
056  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
057
058  private static TestProcEnv procEnv;
059  private static ProcedureExecutor<TestProcEnv> procExecutor;
060  private static ProcedureStore procStore;
061  private static int procSleepInterval;
062
063  private HBaseCommonTestingUtility htu;
064  private FileSystem fs;
065  private Path testDir;
066  private Path logDir;
067
068  @Before
069  public void setUp() throws IOException {
070    htu = new HBaseCommonTestingUtility();
071    testDir = htu.getDataTestDir();
072    fs = testDir.getFileSystem(htu.getConfiguration());
073    assertTrue(testDir.depth() > 1);
074
075    logDir = new Path(testDir, "proc-logs");
076    procEnv = new TestProcEnv();
077    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
078    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
079    procExecutor.testing = new ProcedureExecutor.Testing();
080    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
081    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
082    procSleepInterval = 0;
083  }
084
085  @After
086  public void tearDown() throws IOException {
087    procExecutor.stop();
088    procStore.stop(false);
089    fs.delete(logDir, true);
090  }
091
092  private void restart() throws Exception {
093    dumpLogDirState();
094    ProcedureTestingUtility.restart(procExecutor);
095    dumpLogDirState();
096  }
097
098  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
099    private int step = 0;
100
101    public TestSingleStepProcedure() {
102    }
103
104    @Override
105    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
106      env.waitOnLatch();
107      LOG.debug("execute procedure " + this + " step=" + step);
108      step++;
109      setResult(Bytes.toBytes(step));
110      return null;
111    }
112
113    @Override
114    protected void rollback(TestProcEnv env) {
115    }
116
117    @Override
118    protected boolean abort(TestProcEnv env) {
119      return true;
120    }
121  }
122
123  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
124    private AtomicBoolean abort = new AtomicBoolean(false);
125    private int step = 0;
126
127    @Override
128    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
129      env.waitOnLatch();
130      LOG.debug("execute procedure " + this + " step=" + step);
131      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
132      step++;
133      Threads.sleepWithoutInterrupt(procSleepInterval);
134      if (isAborted()) {
135        setFailure(new RemoteProcedureException(getClass().getName(), new ProcedureAbortedException(
136          "got an abort at " + getClass().getName() + " step=" + step)));
137        return null;
138      }
139      return null;
140    }
141
142    @Override
143    protected void rollback(TestProcEnv env) {
144      LOG.debug("rollback procedure " + this + " step=" + step);
145      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
146      step++;
147    }
148
149    @Override
150    protected boolean abort(TestProcEnv env) {
151      abort.set(true);
152      return true;
153    }
154
155    private boolean isAborted() {
156      boolean aborted = abort.get();
157      BaseTestStepProcedure proc = this;
158      while (proc.hasParent() && !aborted) {
159        proc = (BaseTestStepProcedure) procExecutor.getProcedure(proc.getParentProcId());
160        aborted = proc.isAborted();
161      }
162      return aborted;
163    }
164  }
165
166  public static class TestMultiStepProcedure extends BaseTestStepProcedure {
167    public TestMultiStepProcedure() {
168    }
169
170    @Override
171    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
172      super.execute(env);
173      return isFailed() ? null : new Procedure[] { new Step1Procedure() };
174    }
175
176    public static class Step1Procedure extends BaseTestStepProcedure {
177      public Step1Procedure() {
178      }
179
180      @Override
181      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
182        super.execute(env);
183        return isFailed() ? null : new Procedure[] { new Step2Procedure() };
184      }
185    }
186
187    public static class Step2Procedure extends BaseTestStepProcedure {
188      public Step2Procedure() {
189      }
190    }
191  }
192
193  @Test
194  public void testNoopLoad() throws Exception {
195    restart();
196  }
197
198  @Test
199  public void testSingleStepProcRecovery() throws Exception {
200    Procedure proc = new TestSingleStepProcedure();
201    procExecutor.testing.killBeforeStoreUpdate = true;
202    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
203    assertFalse(procExecutor.isRunning());
204    procExecutor.testing.killBeforeStoreUpdate = false;
205
206    // Restart and verify that the procedures restart
207    long restartTs = EnvironmentEdgeManager.currentTime();
208    restart();
209    waitProcedure(procId);
210    Procedure<?> result = procExecutor.getResult(procId);
211    assertTrue(result.getLastUpdate() > restartTs);
212    ProcedureTestingUtility.assertProcNotFailed(result);
213    assertEquals(1, Bytes.toInt(result.getResult()));
214    long resultTs = result.getLastUpdate();
215
216    // Verify that after another restart the result is still there
217    restart();
218    result = procExecutor.getResult(procId);
219    ProcedureTestingUtility.assertProcNotFailed(result);
220    assertEquals(resultTs, result.getLastUpdate());
221    assertEquals(1, Bytes.toInt(result.getResult()));
222  }
223
224  @Test
225  public void testMultiStepProcRecovery() throws Exception {
226    // Step 0 - kill
227    Procedure proc = new TestMultiStepProcedure();
228    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
229    assertFalse(procExecutor.isRunning());
230
231    // Step 0 exec && Step 1 - kill
232    restart();
233    waitProcedure(procId);
234    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
235    assertFalse(procExecutor.isRunning());
236
237    // Step 1 exec && step 2 - kill
238    restart();
239    waitProcedure(procId);
240    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
241    assertFalse(procExecutor.isRunning());
242
243    // Step 2 exec
244    restart();
245    waitProcedure(procId);
246    assertTrue(procExecutor.isRunning());
247
248    // The procedure is completed
249    Procedure<?> result = procExecutor.getResult(procId);
250    ProcedureTestingUtility.assertProcNotFailed(result);
251  }
252
253  @Test
254  public void testMultiStepRollbackRecovery() throws Exception {
255    // Step 0 - kill
256    Procedure proc = new TestMultiStepProcedure();
257    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
258    assertFalse(procExecutor.isRunning());
259
260    // Step 0 exec && Step 1 - kill
261    restart();
262    waitProcedure(procId);
263    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
264    assertFalse(procExecutor.isRunning());
265
266    // Step 1 exec && step 2 - kill
267    restart();
268    waitProcedure(procId);
269    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
270    assertFalse(procExecutor.isRunning());
271
272    // Step 2 exec - rollback - kill
273    procSleepInterval = 2500;
274    restart();
275    assertTrue(procExecutor.abort(procId));
276    waitProcedure(procId);
277    assertFalse(procExecutor.isRunning());
278
279    // rollback - kill
280    restart();
281    waitProcedure(procId);
282    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
283    assertFalse(procExecutor.isRunning());
284
285    // rollback - complete
286    restart();
287    waitProcedure(procId);
288    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
289    assertFalse(procExecutor.isRunning());
290
291    // Restart the executor and get the result
292    restart();
293    waitProcedure(procId);
294
295    // The procedure is completed
296    Procedure<?> result = procExecutor.getResult(procId);
297    ProcedureTestingUtility.assertIsAbortException(result);
298  }
299
300  public static class TestStateMachineProcedure
301    extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
302    enum State {
303      STATE_1,
304      STATE_2,
305      STATE_3,
306      DONE
307    }
308
309    public TestStateMachineProcedure() {
310    }
311
312    public TestStateMachineProcedure(final boolean testSubmitChildProc) {
313      this.submitChildProc = testSubmitChildProc;
314    }
315
316    private AtomicBoolean aborted = new AtomicBoolean(false);
317    private int iResult = 0;
318    private boolean submitChildProc = false;
319
320    @Override
321    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
322      switch (state) {
323        case STATE_1:
324          LOG.info("execute step 1 " + this);
325          setNextState(State.STATE_2);
326          iResult += 3;
327          break;
328        case STATE_2:
329          LOG.info("execute step 2 " + this);
330          if (submitChildProc) {
331            addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure());
332            setNextState(State.DONE);
333          } else {
334            setNextState(State.STATE_3);
335          }
336          iResult += 5;
337          break;
338        case STATE_3:
339          LOG.info("execute step 3 " + this);
340          Threads.sleepWithoutInterrupt(procSleepInterval);
341          if (aborted.get()) {
342            LOG.info("aborted step 3 " + this);
343            setAbortFailure("test", "aborted");
344            break;
345          }
346          setNextState(State.DONE);
347          iResult += 7;
348          break;
349        case DONE:
350          if (submitChildProc) {
351            addChildProcedure(new TestStateMachineProcedure());
352          }
353          iResult += 11;
354          setResult(Bytes.toBytes(iResult));
355          return Flow.NO_MORE_STATE;
356        default:
357          throw new UnsupportedOperationException();
358      }
359      return Flow.HAS_MORE_STATE;
360    }
361
362    @Override
363    protected void rollbackState(TestProcEnv env, final State state) {
364      switch (state) {
365        case STATE_1:
366          LOG.info("rollback step 1 " + this);
367          break;
368        case STATE_2:
369          LOG.info("rollback step 2 " + this);
370          break;
371        case STATE_3:
372          LOG.info("rollback step 3 " + this);
373          break;
374        default:
375          throw new UnsupportedOperationException();
376      }
377    }
378
379    @Override
380    protected State getState(final int stateId) {
381      return State.values()[stateId];
382    }
383
384    @Override
385    protected int getStateId(final State state) {
386      return state.ordinal();
387    }
388
389    @Override
390    protected State getInitialState() {
391      return State.STATE_1;
392    }
393
394    @Override
395    protected boolean abort(TestProcEnv env) {
396      aborted.set(true);
397      return true;
398    }
399
400    @Override
401    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
402      super.serializeStateData(serializer);
403      Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult);
404      serializer.serialize(builder.build());
405    }
406
407    @Override
408    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
409      super.deserializeStateData(serializer);
410      Int32Value value = serializer.deserialize(Int32Value.class);
411      iResult = value.getValue();
412    }
413  }
414
415  @Test
416  public void testStateMachineMultipleLevel() throws Exception {
417    long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
418    // Wait the completion
419    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
420    Procedure<?> result = procExecutor.getResult(procId);
421    ProcedureTestingUtility.assertProcNotFailed(result);
422    assertEquals(19, Bytes.toInt(result.getResult()));
423    assertEquals(4, procExecutor.getLastProcId());
424  }
425
426  @Test
427  public void testStateMachineRecovery() throws Exception {
428    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
429    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
430
431    // Step 1 - kill
432    Procedure proc = new TestStateMachineProcedure();
433    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
434    assertFalse(procExecutor.isRunning());
435
436    // Step 1 exec && Step 2 - kill
437    restart();
438    waitProcedure(procId);
439    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
440    assertFalse(procExecutor.isRunning());
441
442    // Step 2 exec && step 3 - kill
443    restart();
444    waitProcedure(procId);
445    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
446    assertFalse(procExecutor.isRunning());
447
448    // Step 3 exec
449    restart();
450    waitProcedure(procId);
451    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
452    assertFalse(procExecutor.isRunning());
453
454    restart();
455    waitProcedure(procId);
456    assertTrue(procExecutor.isRunning());
457
458    // The procedure is completed
459    Procedure<?> result = procExecutor.getResult(procId);
460    ProcedureTestingUtility.assertProcNotFailed(result);
461    assertEquals(26, Bytes.toInt(result.getResult()));
462  }
463
464  @Test
465  public void testStateMachineRollbackRecovery() throws Exception {
466    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
467    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
468
469    // Step 1 - kill
470    Procedure proc = new TestStateMachineProcedure();
471    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
472    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
473    assertFalse(procExecutor.isRunning());
474
475    // Step 1 exec && Step 2 - kill
476    restart();
477    waitProcedure(procId);
478    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
479    assertFalse(procExecutor.isRunning());
480
481    // Step 2 exec && step 3 - kill
482    restart();
483    waitProcedure(procId);
484    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
485    assertFalse(procExecutor.isRunning());
486
487    // Step 3 exec - rollback step 3 - kill
488    procSleepInterval = 2500;
489    restart();
490    assertTrue(procExecutor.abort(procId));
491    waitProcedure(procId);
492    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
493    assertFalse(procExecutor.isRunning());
494
495    // Rollback step 3 - rollback step 2 - kill
496    restart();
497    waitProcedure(procId);
498    assertFalse(procExecutor.isRunning());
499    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
500
501    // Rollback step 2 - step 1 - kill
502    restart();
503    waitProcedure(procId);
504    assertFalse(procExecutor.isRunning());
505    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
506
507    // Rollback step 1 - complete
508    restart();
509    waitProcedure(procId);
510    assertTrue(procExecutor.isRunning());
511
512    // The procedure is completed
513    Procedure<?> result = procExecutor.getResult(procId);
514    ProcedureTestingUtility.assertIsAbortException(result);
515  }
516
517  private void waitProcedure(final long procId) {
518    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
519    dumpLogDirState();
520  }
521
522  private void dumpLogDirState() {
523    try {
524      FileStatus[] files = fs.listStatus(logDir);
525      if (files != null && files.length > 0) {
526        for (FileStatus file : files) {
527          assertTrue(file.toString(), file.isFile());
528          LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
529        }
530      } else {
531        LOG.debug("no files under: " + logDir);
532      }
533    } catch (IOException e) {
534      LOG.warn("Unable to dump " + logDir, e);
535    }
536  }
537
538  private static class TestProcEnv {
539    private CountDownLatch latch = null;
540
541    /**
542     * set/unset a latch. every procedure execute() step will wait on the latch if any.
543     */
544    public void setWaitLatch(CountDownLatch latch) {
545      this.latch = latch;
546    }
547
548    public void waitOnLatch() throws InterruptedException {
549      if (latch != null) {
550        latch.await();
551      }
552    }
553  }
554}