001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.atomic.AtomicBoolean;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
033import org.apache.hadoop.hbase.testclassification.MasterTests;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.After;
039import org.junit.Before;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
047
048@Category({MasterTests.class, SmallTests.class})
049public class TestProcedureRecovery {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053      HBaseClassTestRule.forClass(TestProcedureRecovery.class);
054
055  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class);
056
057  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
058
059  private static TestProcEnv procEnv;
060  private static ProcedureExecutor<TestProcEnv> procExecutor;
061  private static ProcedureStore procStore;
062  private static int procSleepInterval;
063
064  private HBaseCommonTestingUtility htu;
065  private FileSystem fs;
066  private Path testDir;
067  private Path logDir;
068
069  @Before
070  public void setUp() throws IOException {
071    htu = new HBaseCommonTestingUtility();
072    testDir = htu.getDataTestDir();
073    fs = testDir.getFileSystem(htu.getConfiguration());
074    assertTrue(testDir.depth() > 1);
075
076    logDir = new Path(testDir, "proc-logs");
077    procEnv = new TestProcEnv();
078    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
079    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
080    procExecutor.testing = new ProcedureExecutor.Testing();
081    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
082    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
083    procSleepInterval = 0;
084  }
085
086  @After
087  public void tearDown() throws IOException {
088    procExecutor.stop();
089    procStore.stop(false);
090    fs.delete(logDir, true);
091  }
092
093  private void restart() throws Exception {
094    dumpLogDirState();
095    ProcedureTestingUtility.restart(procExecutor);
096    dumpLogDirState();
097  }
098
099  public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
100    private int step = 0;
101
102    public TestSingleStepProcedure() { }
103
104    @Override
105    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
106      env.waitOnLatch();
107      LOG.debug("execute procedure " + this + " step=" + step);
108      step++;
109      setResult(Bytes.toBytes(step));
110      return null;
111    }
112
113    @Override
114    protected void rollback(TestProcEnv env) { }
115
116    @Override
117    protected boolean abort(TestProcEnv env) { return true; }
118  }
119
120  public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
121    private AtomicBoolean abort = new AtomicBoolean(false);
122    private int step = 0;
123
124    @Override
125    protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
126      env.waitOnLatch();
127      LOG.debug("execute procedure " + this + " step=" + step);
128      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
129      step++;
130      Threads.sleepWithoutInterrupt(procSleepInterval);
131      if (isAborted()) {
132        setFailure(new RemoteProcedureException(getClass().getName(),
133          new ProcedureAbortedException(
134            "got an abort at " + getClass().getName() + " step=" + step)));
135        return null;
136      }
137      return null;
138    }
139
140    @Override
141    protected void rollback(TestProcEnv env) {
142      LOG.debug("rollback procedure " + this + " step=" + step);
143      ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor);
144      step++;
145    }
146
147    @Override
148    protected boolean abort(TestProcEnv env) {
149      abort.set(true);
150      return true;
151    }
152
153    private boolean isAborted() {
154      boolean aborted = abort.get();
155      BaseTestStepProcedure proc = this;
156      while (proc.hasParent() && !aborted) {
157        proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId());
158        aborted = proc.isAborted();
159      }
160      return aborted;
161    }
162  }
163
164  public static class TestMultiStepProcedure extends BaseTestStepProcedure {
165    public TestMultiStepProcedure() { }
166
167    @Override
168    public Procedure[] execute(TestProcEnv env) throws InterruptedException {
169      super.execute(env);
170      return isFailed() ? null : new Procedure[] { new Step1Procedure() };
171    }
172
173    public static class Step1Procedure extends BaseTestStepProcedure {
174      public Step1Procedure() { }
175
176      @Override
177      protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
178        super.execute(env);
179        return isFailed() ? null : new Procedure[] { new Step2Procedure() };
180      }
181    }
182
183    public static class Step2Procedure extends BaseTestStepProcedure {
184      public Step2Procedure() { }
185    }
186  }
187
188  @Test
189  public void testNoopLoad() throws Exception {
190    restart();
191  }
192
193  @Test
194  public void testSingleStepProcRecovery() throws Exception {
195    Procedure proc = new TestSingleStepProcedure();
196    procExecutor.testing.killBeforeStoreUpdate = true;
197    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
198    assertFalse(procExecutor.isRunning());
199    procExecutor.testing.killBeforeStoreUpdate = false;
200
201    // Restart and verify that the procedures restart
202    long restartTs = EnvironmentEdgeManager.currentTime();
203    restart();
204    waitProcedure(procId);
205    Procedure<?> result = procExecutor.getResult(procId);
206    assertTrue(result.getLastUpdate() > restartTs);
207    ProcedureTestingUtility.assertProcNotFailed(result);
208    assertEquals(1, Bytes.toInt(result.getResult()));
209    long resultTs = result.getLastUpdate();
210
211    // Verify that after another restart the result is still there
212    restart();
213    result = procExecutor.getResult(procId);
214    ProcedureTestingUtility.assertProcNotFailed(result);
215    assertEquals(resultTs, result.getLastUpdate());
216    assertEquals(1, Bytes.toInt(result.getResult()));
217  }
218
219  @Test
220  public void testMultiStepProcRecovery() throws Exception {
221    // Step 0 - kill
222    Procedure proc = new TestMultiStepProcedure();
223    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
224    assertFalse(procExecutor.isRunning());
225
226    // Step 0 exec && Step 1 - kill
227    restart();
228    waitProcedure(procId);
229    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
230    assertFalse(procExecutor.isRunning());
231
232    // Step 1 exec && step 2 - kill
233    restart();
234    waitProcedure(procId);
235    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
236    assertFalse(procExecutor.isRunning());
237
238    // Step 2 exec
239    restart();
240    waitProcedure(procId);
241    assertTrue(procExecutor.isRunning());
242
243    // The procedure is completed
244    Procedure<?> result = procExecutor.getResult(procId);
245    ProcedureTestingUtility.assertProcNotFailed(result);
246  }
247
248  @Test
249  public void testMultiStepRollbackRecovery() throws Exception {
250    // Step 0 - kill
251    Procedure proc = new TestMultiStepProcedure();
252    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
253    assertFalse(procExecutor.isRunning());
254
255    // Step 0 exec && Step 1 - kill
256    restart();
257    waitProcedure(procId);
258    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
259    assertFalse(procExecutor.isRunning());
260
261    // Step 1 exec && step 2 - kill
262    restart();
263    waitProcedure(procId);
264    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
265    assertFalse(procExecutor.isRunning());
266
267    // Step 2 exec - rollback - kill
268    procSleepInterval = 2500;
269    restart();
270    assertTrue(procExecutor.abort(procId));
271    waitProcedure(procId);
272    assertFalse(procExecutor.isRunning());
273
274    // rollback - kill
275    restart();
276    waitProcedure(procId);
277    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
278    assertFalse(procExecutor.isRunning());
279
280    // rollback - complete
281    restart();
282    waitProcedure(procId);
283    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
284    assertFalse(procExecutor.isRunning());
285
286    // Restart the executor and get the result
287    restart();
288    waitProcedure(procId);
289
290    // The procedure is completed
291    Procedure<?> result = procExecutor.getResult(procId);
292    ProcedureTestingUtility.assertIsAbortException(result);
293  }
294
295  public static class TestStateMachineProcedure
296      extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
297    enum State { STATE_1, STATE_2, STATE_3, DONE }
298
299    public TestStateMachineProcedure() {}
300
301    public TestStateMachineProcedure(final boolean testSubmitChildProc) {
302      this.submitChildProc = testSubmitChildProc;
303    }
304
305    private AtomicBoolean aborted = new AtomicBoolean(false);
306    private int iResult = 0;
307    private boolean submitChildProc = false;
308
309    @Override
310    protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) {
311      switch (state) {
312        case STATE_1:
313          LOG.info("execute step 1 " + this);
314          setNextState(State.STATE_2);
315          iResult += 3;
316          break;
317        case STATE_2:
318          LOG.info("execute step 2 " + this);
319          if (submitChildProc) {
320            addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure());
321            setNextState(State.DONE);
322          } else {
323            setNextState(State.STATE_3);
324          }
325          iResult += 5;
326          break;
327        case STATE_3:
328          LOG.info("execute step 3 " + this);
329          Threads.sleepWithoutInterrupt(procSleepInterval);
330          if (aborted.get()) {
331            LOG.info("aborted step 3 " + this);
332            setAbortFailure("test", "aborted");
333            break;
334          }
335          setNextState(State.DONE);
336          iResult += 7;
337          break;
338        case DONE:
339          if (submitChildProc) {
340            addChildProcedure(new TestStateMachineProcedure());
341          }
342          iResult += 11;
343          setResult(Bytes.toBytes(iResult));
344          return Flow.NO_MORE_STATE;
345        default:
346          throw new UnsupportedOperationException();
347      }
348      return Flow.HAS_MORE_STATE;
349    }
350
351    @Override
352    protected void rollbackState(TestProcEnv env, final State state) {
353      switch (state) {
354        case STATE_1:
355          LOG.info("rollback step 1 " + this);
356          break;
357        case STATE_2:
358          LOG.info("rollback step 2 " + this);
359          break;
360        case STATE_3:
361          LOG.info("rollback step 3 " + this);
362          break;
363        default:
364          throw new UnsupportedOperationException();
365      }
366    }
367
368    @Override
369    protected State getState(final int stateId) {
370      return State.values()[stateId];
371    }
372
373    @Override
374    protected int getStateId(final State state) {
375      return state.ordinal();
376    }
377
378    @Override
379    protected State getInitialState() {
380      return State.STATE_1;
381    }
382
383    @Override
384    protected boolean abort(TestProcEnv env) {
385      aborted.set(true);
386      return true;
387    }
388
389    @Override
390    protected void serializeStateData(ProcedureStateSerializer serializer)
391        throws IOException {
392      super.serializeStateData(serializer);
393      Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult);
394      serializer.serialize(builder.build());
395    }
396
397    @Override
398    protected void deserializeStateData(ProcedureStateSerializer serializer)
399        throws IOException {
400      super.deserializeStateData(serializer);
401      Int32Value value = serializer.deserialize(Int32Value.class);
402      iResult = value.getValue();
403    }
404  }
405
406  @Test
407  public void testStateMachineMultipleLevel() throws Exception {
408    long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true));
409    // Wait the completion
410    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
411    Procedure<?> result = procExecutor.getResult(procId);
412    ProcedureTestingUtility.assertProcNotFailed(result);
413    assertEquals(19, Bytes.toInt(result.getResult()));
414    assertEquals(4, procExecutor.getLastProcId());
415  }
416
417  @Test
418  public void testStateMachineRecovery() throws Exception {
419    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
420    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
421
422    // Step 1 - kill
423    Procedure proc = new TestStateMachineProcedure();
424    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
425    assertFalse(procExecutor.isRunning());
426
427    // Step 1 exec && Step 2 - kill
428    restart();
429    waitProcedure(procId);
430    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
431    assertFalse(procExecutor.isRunning());
432
433    // Step 2 exec && step 3 - kill
434    restart();
435    waitProcedure(procId);
436    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
437    assertFalse(procExecutor.isRunning());
438
439    // Step 3 exec
440    restart();
441    waitProcedure(procId);
442    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
443    assertFalse(procExecutor.isRunning());
444
445    restart();
446    waitProcedure(procId);
447    assertTrue(procExecutor.isRunning());
448
449    // The procedure is completed
450    Procedure<?> result = procExecutor.getResult(procId);
451    ProcedureTestingUtility.assertProcNotFailed(result);
452    assertEquals(26, Bytes.toInt(result.getResult()));
453  }
454
455  @Test
456  public void testStateMachineRollbackRecovery() throws Exception {
457    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true);
458    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true);
459
460    // Step 1 - kill
461    Procedure proc = new TestStateMachineProcedure();
462    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
463    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
464    assertFalse(procExecutor.isRunning());
465
466    // Step 1 exec && Step 2 - kill
467    restart();
468    waitProcedure(procId);
469    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
470    assertFalse(procExecutor.isRunning());
471
472    // Step 2 exec && step 3 - kill
473    restart();
474    waitProcedure(procId);
475    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
476    assertFalse(procExecutor.isRunning());
477
478    // Step 3 exec - rollback step 3 - kill
479    procSleepInterval = 2500;
480    restart();
481    assertTrue(procExecutor.abort(procId));
482    waitProcedure(procId);
483    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
484    assertFalse(procExecutor.isRunning());
485
486    // Rollback step 3 - rollback step 2 - kill
487    restart();
488    waitProcedure(procId);
489    assertFalse(procExecutor.isRunning());
490    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
491
492    // Rollback step 2 - step 1 - kill
493    restart();
494    waitProcedure(procId);
495    assertFalse(procExecutor.isRunning());
496    ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId);
497
498    // Rollback step 1 - complete
499    restart();
500    waitProcedure(procId);
501    assertTrue(procExecutor.isRunning());
502
503    // The procedure is completed
504    Procedure<?> result = procExecutor.getResult(procId);
505    ProcedureTestingUtility.assertIsAbortException(result);
506  }
507
508  private void waitProcedure(final long procId) {
509    ProcedureTestingUtility.waitProcedure(procExecutor, procId);
510    dumpLogDirState();
511  }
512
513  private void dumpLogDirState() {
514    try {
515      FileStatus[] files = fs.listStatus(logDir);
516      if (files != null && files.length > 0) {
517        for (FileStatus file: files) {
518          assertTrue(file.toString(), file.isFile());
519          LOG.debug("log file " + file.getPath() + " size=" + file.getLen());
520        }
521      } else {
522        LOG.debug("no files under: " + logDir);
523      }
524    } catch (IOException e) {
525      LOG.warn("Unable to dump " + logDir, e);
526    }
527  }
528
529  private static class TestProcEnv {
530    private CountDownLatch latch = null;
531
532    /**
533     * set/unset a latch. every procedure execute() step will wait on the latch if any.
534     */
535    public void setWaitLatch(CountDownLatch latch) {
536      this.latch = latch;
537    }
538
539    public void waitOnLatch() throws InterruptedException {
540      if (latch != null) {
541        latch.await();
542      }
543    }
544  }
545}