001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
047
048@Category({MasterTests.class, SmallTests.class})
049public class TestProcedureRecovery {
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052      HBaseClassTestRule.forClass(TestProcedureRecovery.class);
053
054  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class);
055
056  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
057
058  private static TestProcEnv procEnv;
059  private static ProcedureExecutor<TestProcEnv> procExecutor;
060  private static ProcedureStore procStore;
061  private static int procSleepInterval;
062
063  private HBaseCommonTestingUtility htu;
064  private FileSystem fs;
065  private Path testDir;
066  private Path logDir;
067
068  @Before
069  public void setUp() throws IOException {
070    htu = new HBaseCommonTestingUtility();
071    testDir = htu.getDataTestDir();
072    fs = testDir.getFileSystem(htu.getConfiguration());
073    assertTrue(testDir.depth() > 1);
074
075    logDir = new Path(testDir, "proc-logs");
076    procEnv = new TestProcEnv();
077    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
078    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
079    procExecutor.testing = new ProcedureExecutor.Testing();
080    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
081    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
082    procSleepInterval = 0;
083  }
084
085  @After
086  public void tearDown() throws IOException {
087    procExecutor.stop();
088    procStore.stop(false);
089    fs.delete(logDir, true);
090  }
091
092  private void restart() throws Exception {
093    dumpLogDirState();
094    ProcedureTestingUtility.restart(procExecutor);
095    dumpLogDirState();
096  }
097
098  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
099    private int step = 0;
100
101    public TestSingleStepProcedure() { }
102
103    @Override
104    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
105      env.waitOnLatch();
106      LOG.debug("execute procedure " + this + " step=" + step);
107      step++;
108      setResult(Bytes.toBytes(step));
109      return null;
110    }
111
112    @Override
113    protected void rollback(TestProcEnv env) { }
114
115    @Override
116    protected boolean abort(TestProcEnv env) {
117      return true;
118    }
119  }
120
121  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
122    private AtomicBoolean abort = new AtomicBoolean(false);
123    private int step = 0;
124
125    @Override
126    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
127      env.waitOnLatch();
128      LOG.debug("execute procedure " + this + " step=" + step);
129      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
130      step++;
131      Threads.sleepWithoutInterrupt(procSleepInterval);
132      if (isAborted()) {
133        setFailure(new RemoteProcedureException(getClass().getName(),
134          new ProcedureAbortedException(
135            "got an abort at " + getClass().getName() + " step=" + step)));
136        return null;
137      }
138      return null;
139    }
140
141    @Override
142    protected void rollback(TestProcEnv env) {
143      LOG.debug("rollback procedure " + this + " step=" + step);
144      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
145      step++;
146    }
147
148    @Override
149    protected boolean abort(TestProcEnv env) {
150      abort.set(true);
151      return true;
152    }
153
154    private boolean isAborted() {
155      boolean aborted = abort.get();
156      BaseTestStepProcedure proc = this;
157      while (proc.hasParent() && !aborted) {
158        proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
159        aborted = proc.isAborted();
160      }
161      return aborted;
162    }
163  }
164
165  public static class TestMultiStepProcedure extends BaseTestStepProcedure {
166    public TestMultiStepProcedure() { }
167
168    @Override
169    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
170      super.execute(env);
171      return isFailed() ? null : new Procedure[] { new Step1Procedure() };
172    }
173
174    public static class Step1Procedure extends BaseTestStepProcedure {
175      public Step1Procedure() { }
176
177      @Override
178      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
179        super.execute(env);
180        return isFailed() ? null : new Procedure[] { new Step2Procedure() };
181      }
182    }
183
184    public static class Step2Procedure extends BaseTestStepProcedure {
185      public Step2Procedure() { }
186    }
187  }
188
189  @Test
190  public void testNoopLoad() throws Exception {
191    restart();
192  }
193
194  @Test
195  public void testSingleStepProcRecovery() throws Exception {
196    Procedure proc = new TestSingleStepProcedure();
197    procExecutor.testing.killBeforeStoreUpdate = true;
198    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
199    assertFalse(procExecutor.isRunning());
200    procExecutor.testing.killBeforeStoreUpdate = false;
201
202    // Restart and verify that the procedures restart
203    long restartTs = EnvironmentEdgeManager.currentTime();
204    restart();
205    waitProcedure(procId);
206    Procedure<?> result = procExecutor.getResult(procId);
207    assertTrue(result.getLastUpdate() > restartTs);
208    ProcedureTestingUtility.assertProcNotFailed(result);
209    assertEquals(1, Bytes.toInt(result.getResult()));
210    long resultTs = result.getLastUpdate();
211
212    // Verify that after another restart the result is still there
213    restart();
214    result = procExecutor.getResult(procId);
215    ProcedureTestingUtility.assertProcNotFailed(result);
216    assertEquals(resultTs, result.getLastUpdate());
217    assertEquals(1, Bytes.toInt(result.getResult()));
218  }
219
220  @Test
221  public void testMultiStepProcRecovery() throws Exception {
222    // Step 0 - kill
223    Procedure proc = new TestMultiStepProcedure();
224    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
225    assertFalse(procExecutor.isRunning());
226
227    // Step 0 exec && Step 1 - kill
228    restart();
229    waitProcedure(procId);
230    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
231    assertFalse(procExecutor.isRunning());
232
233    // Step 1 exec && step 2 - kill
234    restart();
235    waitProcedure(procId);
236    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
237    assertFalse(procExecutor.isRunning());
238
239    // Step 2 exec
240    restart();
241    waitProcedure(procId);
242    assertTrue(procExecutor.isRunning());
243
244    // The procedure is completed
245    Procedure<?> result = procExecutor.getResult(procId);
246    ProcedureTestingUtility.assertProcNotFailed(result);
247  }
248
249  @Test
250  public void testMultiStepRollbackRecovery() throws Exception {
251    // Step 0 - kill
252    Procedure proc = new TestMultiStepProcedure();
253    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
254    assertFalse(procExecutor.isRunning());
255
256    // Step 0 exec && Step 1 - kill
257    restart();
258    waitProcedure(procId);
259    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
260    assertFalse(procExecutor.isRunning());
261
262    // Step 1 exec && step 2 - kill
263    restart();
264    waitProcedure(procId);
265    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
266    assertFalse(procExecutor.isRunning());
267
268    // Step 2 exec - rollback - kill
269    procSleepInterval = 2500;
270    restart();
271    assertTrue(procExecutor.abort(procId));
272    waitProcedure(procId);
273    assertFalse(procExecutor.isRunning());
274
275    // rollback - kill
276    restart();
277    waitProcedure(procId);
278    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
279    assertFalse(procExecutor.isRunning());
280
281    // rollback - complete
282    restart();
283    waitProcedure(procId);
284    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
285    assertFalse(procExecutor.isRunning());
286
287    // Restart the executor and get the result
288    restart();
289    waitProcedure(procId);
290
291    // The procedure is completed
292    Procedure<?> result = procExecutor.getResult(procId);
293    ProcedureTestingUtility.assertIsAbortException(result);
294  }
295
296  public static class TestStateMachineProcedure
297      extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
298    enum State { STATE_1, STATE_2, STATE_3, DONE }
299
300    public TestStateMachineProcedure() {}
301
302    public TestStateMachineProcedure(final boolean testSubmitChildProc) {
303      this.submitChildProc = testSubmitChildProc;
304    }
305
306    private AtomicBoolean aborted = new AtomicBoolean(false);
307    private int iResult = 0;
308    private boolean submitChildProc = false;
309
310    @Override
311    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
312      switch (state) {
313        case STATE_1:
314          LOG.info("execute step 1 " + this);
315          setNextState(State.STATE_2);
316          iResult += 3;
317          break;
318        case STATE_2:
319          LOG.info("execute step 2 " + this);
320          if (submitChildProc) {
321            addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure());
322            setNextState(State.DONE);
323          } else {
324            setNextState(State.STATE_3);
325          }
326          iResult += 5;
327          break;
328        case STATE_3:
329          LOG.info("execute step 3 " + this);
330          Threads.sleepWithoutInterrupt(procSleepInterval);
331          if (aborted.get()) {
332            LOG.info("aborted step 3 " + this);
333            setAbortFailure("test", "aborted");
334            break;
335          }
336          setNextState(State.DONE);
337          iResult += 7;
338          break;
339        case DONE:
340          if (submitChildProc) {
341            addChildProcedure(new TestStateMachineProcedure());
342          }
343          iResult += 11;
344          setResult(Bytes.toBytes(iResult));
345          return Flow.NO_MORE_STATE;
346        default:
347          throw new UnsupportedOperationException();
348      }
349      return Flow.HAS_MORE_STATE;
350    }
351
352    @Override
353    protected void rollbackState(TestProcEnv env, final State state) {
354      switch (state) {
355        case STATE_1:
356          LOG.info("rollback step 1 " + this);
357          break;
358        case STATE_2:
359          LOG.info("rollback step 2 " + this);
360          break;
361        case STATE_3:
362          LOG.info("rollback step 3 " + this);
363          break;
364        default:
365          throw new UnsupportedOperationException();
366      }
367    }
368
369    @Override
370    protected State getState(final int stateId) {
371      return State.values()[stateId];
372    }
373
374    @Override
375    protected int getStateId(final State state) {
376      return state.ordinal();
377    }
378
379    @Override
380    protected State getInitialState() {
381      return State.STATE_1;
382    }
383
384    @Override
385    protected boolean abort(TestProcEnv env) {
386      aborted.set(true);
387      return true;
388    }
389
390    @Override
391    protected void serializeStateData(ProcedureStateSerializer serializer)
392        throws IOException {
393      super.serializeStateData(serializer);
394      Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult);
395      serializer.serialize(builder.build());
396    }
397
398    @Override
399    protected void deserializeStateData(ProcedureStateSerializer serializer)
400        throws IOException {
401      super.deserializeStateData(serializer);
402      Int32Value value = serializer.deserialize(Int32Value.class);
403      iResult = value.getValue();
404    }
405  }
406
407  @Test
408  public void testStateMachineMultipleLevel() throws Exception {
409    long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
410    // Wait the completion
411    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
412    Procedure<?> result = procExecutor.getResult(procId);
413    ProcedureTestingUtility.assertProcNotFailed(result);
414    assertEquals(19, Bytes.toInt(result.getResult()));
415    assertEquals(4, procExecutor.getLastProcId());
416  }
417
418  @Test
419  public void testStateMachineRecovery() throws Exception {
420    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
421    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
422
423    // Step 1 - kill
424    Procedure proc = new TestStateMachineProcedure();
425    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
426    assertFalse(procExecutor.isRunning());
427
428    // Step 1 exec && Step 2 - kill
429    restart();
430    waitProcedure(procId);
431    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
432    assertFalse(procExecutor.isRunning());
433
434    // Step 2 exec && step 3 - kill
435    restart();
436    waitProcedure(procId);
437    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
438    assertFalse(procExecutor.isRunning());
439
440    // Step 3 exec
441    restart();
442    waitProcedure(procId);
443    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
444    assertFalse(procExecutor.isRunning());
445
446    restart();
447    waitProcedure(procId);
448    assertTrue(procExecutor.isRunning());
449
450    // The procedure is completed
451    Procedure<?> result = procExecutor.getResult(procId);
452    ProcedureTestingUtility.assertProcNotFailed(result);
453    assertEquals(26, Bytes.toInt(result.getResult()));
454  }
455
456  @Test
457  public void testStateMachineRollbackRecovery() throws Exception {
458    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
459    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
460
461    // Step 1 - kill
462    Procedure proc = new TestStateMachineProcedure();
463    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
464    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
465    assertFalse(procExecutor.isRunning());
466
467    // Step 1 exec && Step 2 - kill
468    restart();
469    waitProcedure(procId);
470    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
471    assertFalse(procExecutor.isRunning());
472
473    // Step 2 exec && step 3 - kill
474    restart();
475    waitProcedure(procId);
476    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
477    assertFalse(procExecutor.isRunning());
478
479    // Step 3 exec - rollback step 3 - kill
480    procSleepInterval = 2500;
481    restart();
482    assertTrue(procExecutor.abort(procId));
483    waitProcedure(procId);
484    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
485    assertFalse(procExecutor.isRunning());
486
487    // Rollback step 3 - rollback step 2 - kill
488    restart();
489    waitProcedure(procId);
490    assertFalse(procExecutor.isRunning());
491    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
492
493    // Rollback step 2 - step 1 - kill
494    restart();
495    waitProcedure(procId);
496    assertFalse(procExecutor.isRunning());
497    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
498
499    // Rollback step 1 - complete
500    restart();
501    waitProcedure(procId);
502    assertTrue(procExecutor.isRunning());
503
504    // The procedure is completed
505    Procedure<?> result = procExecutor.getResult(procId);
506    ProcedureTestingUtility.assertIsAbortException(result);
507  }
508
509  private void waitProcedure(final long procId) {
510    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
511    dumpLogDirState();
512  }
513
514  private void dumpLogDirState() {
515    try {
516      FileStatus[] files = fs.listStatus(logDir);
517      if (files != null && files.length > 0) {
518        for (FileStatus file: files) {
519          assertTrue(file.toString(), file.isFile());
520          LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
521        }
522      } else {
523        LOG.debug("no files under: " + logDir);
524      }
525    } catch (IOException e) {
526      LOG.warn("Unable to dump " + logDir, e);
527    }
528  }
529
530  private static class TestProcEnv {
531    private CountDownLatch latch = null;
532
533    /**
534     * set/unset a latch. every procedure execute() step will wait on the latch if any.
535     */
536    public void setWaitLatch(CountDownLatch latch) {
537      this.latch = latch;
538    }
539
540    public void waitOnLatch() throws InterruptedException {
541      if (latch != null) {
542        latch.await();
543      }
544    }
545  }
546}