001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
033import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
034import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
035import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
037import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
038import org.apache.hadoop.hbase.util.NonceKey;
039import org.apache.hadoop.hbase.util.Threads;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
044import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
045
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
047
048public final class ProcedureTestingUtility {
049  private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class);
050
051  private ProcedureTestingUtility() {
052  }
053
054  public static ProcedureStore createStore(final Configuration conf, final Path dir)
055      throws IOException {
056    return createWalStore(conf, dir);
057  }
058
059  public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
060      throws IOException {
061    return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
062      @Override
063      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
064        // no-op
065      }
066    });
067  }
068
069  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
070      boolean abort, boolean startWorkers) throws Exception {
071    restart(procExecutor, false, true, null, null, null,  abort, startWorkers);
072  }
073
074  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
075      boolean abort) throws Exception {
076    restart(procExecutor, false, true, null, null, null, abort, true);
077  }
078
079  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
080    restart(procExecutor, false, true, null, null, null, false, true);
081  }
082
083  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
084      boolean abortOnCorruption) throws IOException {
085    initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
086  }
087
088  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
089      boolean abortOnCorruption, boolean startWorkers) throws IOException {
090    procExecutor.init(numThreads, abortOnCorruption);
091    if (startWorkers) {
092      procExecutor.startWorkers();
093    }
094  }
095
096  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
097      boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
098      Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
099    restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction,
100      actionBeforeStartWorker, startAction, false, true);
101  }
102
103  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
104      boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
105      Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
106      boolean startWorkers) throws Exception {
107    final ProcedureStore procStore = procExecutor.getStore();
108    final int storeThreads = procExecutor.getCorePoolSize();
109    final int execThreads = procExecutor.getCorePoolSize();
110
111    final ProcedureExecutor.Testing testing = procExecutor.testing;
112    if (avoidTestKillDuringRestart) {
113      procExecutor.testing = null;
114    }
115
116    // stop
117    LOG.info("RESTART - Stop");
118    procExecutor.stop();
119    procStore.stop(abort);
120    if (stopAction != null) {
121      stopAction.call();
122    }
123    procExecutor.join();
124    procExecutor.getScheduler().clear();
125
126    // nothing running...
127
128    // re-start
129    LOG.info("RESTART - Start");
130    procStore.start(storeThreads);
131    procExecutor.init(execThreads, failOnCorrupted);
132    if (actionBeforeStartWorker != null) {
133      actionBeforeStartWorker.call();
134    }
135    if (avoidTestKillDuringRestart) {
136      procExecutor.testing = testing;
137    }
138    if (startWorkers) {
139      procExecutor.startWorkers();
140    }
141    if (startAction != null) {
142      startAction.call();
143    }
144  }
145
146  public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
147      throws Exception {
148    procStore.stop(false);
149    procStore.start(procStore.getNumThreads());
150    procStore.recoverLease();
151    procStore.load(loader);
152  }
153
154  public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
155      long runnableCount, int completedCount, int corruptedCount) throws Exception {
156    final LoadCounter loader = new LoadCounter();
157    storeRestart(procStore, loader);
158    assertEquals(maxProcId, loader.getMaxProcId());
159    assertEquals(runnableCount, loader.getRunnableCount());
160    assertEquals(completedCount, loader.getCompletedCount());
161    assertEquals(corruptedCount, loader.getCorruptedCount());
162    return loader;
163  }
164
165  private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) {
166    if (procExecutor.testing == null) {
167      procExecutor.testing = new ProcedureExecutor.Testing();
168    }
169  }
170
171  public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor,
172      boolean value) {
173    createExecutorTesting(procExecutor);
174    procExecutor.testing.killIfHasParent = value;
175  }
176
177  public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
178      boolean value) {
179    createExecutorTesting(procExecutor);
180    procExecutor.testing.killIfSuspended = value;
181  }
182
183  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
184      boolean value) {
185    createExecutorTesting(procExecutor);
186    procExecutor.testing.killBeforeStoreUpdate = value;
187    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
188    assertSingleExecutorForKillTests(procExecutor);
189  }
190
191  public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
192      boolean value) {
193    createExecutorTesting(procExecutor);
194    procExecutor.testing.toggleKillBeforeStoreUpdate = value;
195    assertSingleExecutorForKillTests(procExecutor);
196  }
197
198  public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
199    createExecutorTesting(procExecutor);
200    procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
201    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
202    assertSingleExecutorForKillTests(procExecutor);
203  }
204
205  public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
206    createExecutorTesting(procExecutor);
207    procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate;
208    LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate);
209    assertSingleExecutorForKillTests(procExecutor);
210  }
211
212  public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
213      boolean value) {
214    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
215    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
216    assertSingleExecutorForKillTests(procExecutor);
217  }
218
219  private static <TEnv> void assertSingleExecutorForKillTests(
220      final ProcedureExecutor<TEnv> procExecutor) {
221    if (procExecutor.testing == null) {
222      return;
223    }
224
225    if (procExecutor.testing.killBeforeStoreUpdate ||
226        procExecutor.testing.toggleKillBeforeStoreUpdate) {
227      assertEquals("expected only one executor running during test with kill/restart",
228        1, procExecutor.getCorePoolSize());
229    }
230  }
231
232  public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
233      throws IOException {
234    NoopProcedureStore procStore = new NoopProcedureStore();
235    ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
236    procStore.start(1);
237    initAndStartWorkers(procExecutor, 1, false, true);
238    try {
239      return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
240    } finally {
241      procStore.stop(false);
242      procExecutor.stop();
243    }
244  }
245
246  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
247    return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
248  }
249
250  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
251      final long nonceGroup, final long nonce) {
252    long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
253    waitProcedure(procExecutor, procId);
254    return procId;
255  }
256
257  public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
258      final long nonceGroup, final long nonce) {
259    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
260    long procId = procExecutor.registerNonce(nonceKey);
261    assertFalse(procId >= 0);
262    return procExecutor.submitProcedure(proc, nonceKey);
263  }
264
265  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
266    while (proc.getState() == ProcedureState.INITIALIZING) {
267      Threads.sleepWithoutInterrupt(250);
268    }
269    waitProcedure(procExecutor, proc.getProcId());
270  }
271
272  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
273    while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
274      Threads.sleepWithoutInterrupt(250);
275    }
276  }
277
278  public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) {
279    for (int i = 0; i < procIds.length; ++i) {
280      waitProcedure(procExecutor, procIds[i]);
281    }
282  }
283
284  public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) {
285    for (long procId : procExecutor.getActiveProcIds()) {
286      waitProcedure(procExecutor, procId);
287    }
288  }
289
290  public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
291    int stableRuns = 0;
292    while (stableRuns < 10) {
293      if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) {
294        stableRuns = 0;
295        Threads.sleepWithoutInterrupt(100);
296      } else {
297        stableRuns++;
298        Threads.sleepWithoutInterrupt(25);
299      }
300    }
301  }
302
303  public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
304      long procId) {
305    assertFalse("expected a running proc", procExecutor.isFinished(procId));
306    assertEquals(null, procExecutor.getResult(procId));
307  }
308
309  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
310      long procId) {
311    Procedure<?> result = procExecutor.getResult(procId);
312    assertTrue("expected procedure result", result != null);
313    assertProcNotFailed(result);
314  }
315
316  public static void assertProcNotFailed(final Procedure<?> result) {
317    assertFalse("found exception: " + result.getException(), result.isFailed());
318  }
319
320  public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor,
321      final long procId) {
322    Procedure<?> result = procExecutor.getResult(procId);
323    assertTrue("expected procedure result", result != null);
324    return assertProcFailed(result);
325  }
326
327  public static Throwable assertProcFailed(final Procedure<?> result) {
328    assertEquals(true, result.isFailed());
329    LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage());
330    return getExceptionCause(result);
331  }
332
333  public static void assertIsAbortException(final Procedure<?> result) {
334    Throwable cause = assertProcFailed(result);
335    assertTrue("expected abort exception, got "+ cause, cause instanceof ProcedureAbortedException);
336  }
337
338  public static void assertIsTimeoutException(final Procedure<?> result) {
339    Throwable cause = assertProcFailed(result);
340    assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
341  }
342
343  public static void assertIsIllegalArgumentException(final Procedure<?> result) {
344    Throwable cause = assertProcFailed(result);
345    assertTrue("expected IllegalArgumentIOException, got " + cause,
346      cause instanceof IllegalArgumentIOException);
347  }
348
349  public static Throwable getExceptionCause(final Procedure<?> procInfo) {
350    assert procInfo.isFailed();
351    Throwable cause = procInfo.getException().getCause();
352    return cause == null ? procInfo.getException() : cause;
353  }
354
355  /**
356   * Run through all procedure flow states TWICE while also restarting
357   * procedure executor at each step; i.e force a reread of procedure store.
358   *
359   *<p>It does
360   * <ol><li>Execute step N - kill the executor before store update
361   * <li>Restart executor/store
362   * <li>Execute step N - and then save to store
363   * </ol>
364   *
365   *<p>This is a good test for finding state that needs persisting and steps that are not
366   * idempotent.
367   */
368  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
369      final long procId) throws Exception {
370    testRecoveryAndDoubleExecution(procExec, procId, false);
371  }
372
373  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
374      final long procId, final boolean expectFailure) throws Exception {
375    testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null);
376  }
377
378  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
379      final long procId, final boolean expectFailure, final Runnable customRestart)
380      throws Exception {
381    Procedure proc = procExec.getProcedure(procId);
382    waitProcedure(procExec, procId);
383    assertEquals(false, procExec.isRunning());
384    for (int i = 0; !procExec.isFinished(procId); ++i) {
385      proc = procExec.getProcedure(procId);
386      LOG.info("Restart " + i + " exec state: " + proc);
387      if (customRestart != null) {
388        customRestart.run();
389      } else {
390        restart(procExec);
391      }
392      waitProcedure(procExec, procId);
393    }
394
395    assertEquals(true, procExec.isRunning());
396    if (expectFailure) {
397      assertProcFailed(procExec, procId);
398    } else {
399      assertProcNotFailed(procExec, procId);
400    }
401  }
402
403  public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
404    public NoopProcedure() {}
405
406    @Override
407    protected Procedure<TEnv>[] execute(TEnv env)
408        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
409      return null;
410    }
411
412    @Override
413    protected void rollback(TEnv env) throws IOException, InterruptedException {
414    }
415
416    @Override
417    protected boolean abort(TEnv env) {
418      return false;
419    }
420
421    @Override
422    protected void serializeStateData(ProcedureStateSerializer serializer)
423        throws IOException {
424    }
425
426    @Override
427    protected void deserializeStateData(ProcedureStateSerializer serializer)
428        throws IOException {
429    }
430  }
431
432  public static class NoopStateMachineProcedure<TEnv, TState>
433      extends StateMachineProcedure<TEnv, TState> {
434    private TState initialState;
435    private TEnv env;
436
437    public NoopStateMachineProcedure() {
438    }
439
440    public NoopStateMachineProcedure(TEnv env, TState initialState) {
441      this.env = env;
442      this.initialState = initialState;
443    }
444
445    @Override
446    protected Flow executeFromState(TEnv env, TState tState)
447        throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
448      return null;
449    }
450
451    @Override
452    protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException {
453
454    }
455
456    @Override
457    protected TState getState(int stateId) {
458      return null;
459    }
460
461    @Override
462    protected int getStateId(TState tState) {
463      return 0;
464    }
465
466    @Override
467    protected TState getInitialState() {
468      return initialState;
469    }
470  }
471
472  public static class TestProcedure extends NoopProcedure<Void> {
473    private byte[] data = null;
474
475    public TestProcedure() {}
476
477    public TestProcedure(long procId) {
478      this(procId, 0);
479    }
480
481    public TestProcedure(long procId, long parentId) {
482      this(procId, parentId, null);
483    }
484
485    public TestProcedure(long procId, long parentId, byte[] data) {
486      this(procId, parentId, parentId, data);
487    }
488
489    public TestProcedure(long procId, long parentId, long rootId, byte[] data) {
490      setData(data);
491      setProcId(procId);
492      if (parentId > 0) {
493        setParentProcId(parentId);
494      }
495      if (rootId > 0 || parentId > 0) {
496        setRootProcId(rootId);
497      }
498    }
499
500    public void addStackId(final int index) {
501      addStackIndex(index);
502    }
503
504    public void setSuccessState() {
505      setState(ProcedureState.SUCCESS);
506    }
507
508    public void setData(final byte[] data) {
509      this.data = data;
510    }
511
512    @Override
513    protected void serializeStateData(ProcedureStateSerializer serializer)
514        throws IOException {
515      ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
516      BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
517      serializer.serialize(builder.build());
518    }
519
520    @Override
521    protected void deserializeStateData(ProcedureStateSerializer serializer)
522        throws IOException {
523      BytesValue bytesValue = serializer.deserialize(BytesValue.class);
524      ByteString dataString = bytesValue.getValue();
525
526      if (dataString.isEmpty()) {
527        data = null;
528      } else {
529        data = dataString.toByteArray();
530      }
531    }
532
533    // Mark acquire/release lock functions public for test uses.
534    @Override
535    public LockState acquireLock(Void env) {
536      return LockState.LOCK_ACQUIRED;
537    }
538
539    @Override
540    public void releaseLock(Void env) {
541      // no-op
542    }
543  }
544
545  public static class LoadCounter implements ProcedureStore.ProcedureLoader {
546    private final ArrayList<Procedure> corrupted = new ArrayList<>();
547    private final ArrayList<Procedure> completed = new ArrayList<>();
548    private final ArrayList<Procedure> runnable = new ArrayList<>();
549
550    private Set<Long> procIds;
551    private long maxProcId = 0;
552
553    public LoadCounter() {
554      this(null);
555    }
556
557    public LoadCounter(final Set<Long> procIds) {
558      this.procIds = procIds;
559    }
560
561    public void reset() {
562      reset(null);
563    }
564
565    public void reset(final Set<Long> procIds) {
566      corrupted.clear();
567      completed.clear();
568      runnable.clear();
569      this.procIds = procIds;
570      this.maxProcId = 0;
571    }
572
573    public long getMaxProcId() {
574      return maxProcId;
575    }
576
577    public ArrayList<Procedure> getRunnables() {
578      return runnable;
579    }
580
581    public int getRunnableCount() {
582      return runnable.size();
583    }
584
585    public ArrayList<Procedure> getCompleted() {
586      return completed;
587    }
588
589    public int getCompletedCount() {
590      return completed.size();
591    }
592
593    public int getLoadedCount() {
594      return runnable.size() + completed.size();
595    }
596
597    public ArrayList<Procedure> getCorrupted() {
598      return corrupted;
599    }
600
601    public int getCorruptedCount() {
602      return corrupted.size();
603    }
604
605    public boolean isRunnable(final long procId) {
606      for (Procedure proc: runnable) {
607        if (proc.getProcId() == procId) {
608          return true;
609        }
610      }
611      return false;
612    }
613
614    @Override
615    public void setMaxProcId(long maxProcId) {
616      this.maxProcId = maxProcId;
617    }
618
619    @Override
620    public void load(ProcedureIterator procIter) throws IOException {
621      while (procIter.hasNext()) {
622        long procId;
623        if (procIter.isNextFinished()) {
624          Procedure<?> proc = procIter.next();
625          procId = proc.getProcId();
626          LOG.debug("loading completed procId=" + procId + ": " + proc);
627          completed.add(proc);
628        } else {
629          Procedure proc = procIter.next();
630          procId = proc.getProcId();
631          LOG.debug("loading runnable procId=" + procId + ": " + proc);
632          runnable.add(proc);
633        }
634        if (procIds != null) {
635          assertTrue("procId=" + procId + " unexpected", procIds.contains(procId));
636        }
637      }
638    }
639
640    @Override
641    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
642      while (procIter.hasNext()) {
643        Procedure proc = procIter.next();
644        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
645        corrupted.add(proc);
646      }
647    }
648  }
649}