001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
033import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
034import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
035import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
037import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
038import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
039import org.apache.hadoop.hbase.util.NonceKey;
040import org.apache.hadoop.hbase.util.Threads;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
045import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
048
049public final class ProcedureTestingUtility {
050  private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class);
051
052  private ProcedureTestingUtility() {
053  }
054
055  public static ProcedureStore createStore(final Configuration conf, final Path dir)
056    throws IOException {
057    return createWalStore(conf, dir);
058  }
059
060  public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
061    throws IOException {
062    return new WALProcedureStore(conf, dir, null, new LeaseRecovery() {
063      @Override
064      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
065        // no-op
066      }
067    });
068  }
069
070  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort,
071    boolean startWorkers) throws Exception {
072    restart(procExecutor, false, true, null, null, null, abort, startWorkers);
073  }
074
075  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort)
076    throws Exception {
077    restart(procExecutor, false, true, null, null, null, abort, true);
078  }
079
080  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
081    restart(procExecutor, false, true, null, null, null, false, true);
082  }
083
084  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
085    boolean abortOnCorruption) throws IOException {
086    initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
087  }
088
089  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
090    boolean abortOnCorruption, boolean startWorkers) throws IOException {
091    procExecutor.init(numThreads, abortOnCorruption);
092    if (startWorkers) {
093      procExecutor.startWorkers();
094    }
095  }
096
097  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
098    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
099    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
100    restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction,
101      actionBeforeStartWorker, startAction, false, true);
102  }
103
104  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
105    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
106    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
107    boolean startWorkers) throws Exception {
108    final ProcedureStore procStore = procExecutor.getStore();
109    final int storeThreads = procExecutor.getCorePoolSize();
110    final int execThreads = procExecutor.getCorePoolSize();
111
112    final ProcedureExecutor.Testing testing = procExecutor.testing;
113    if (avoidTestKillDuringRestart) {
114      procExecutor.testing = null;
115    }
116
117    // stop
118    LOG.info("RESTART - Stop");
119    procExecutor.stop();
120    procStore.stop(abort);
121    if (stopAction != null) {
122      stopAction.call();
123    }
124    procExecutor.join();
125    procExecutor.getScheduler().clear();
126
127    // nothing running...
128
129    // re-start
130    LOG.info("RESTART - Start");
131    procStore.start(storeThreads);
132    procExecutor.init(execThreads, failOnCorrupted);
133    if (actionBeforeStartWorker != null) {
134      actionBeforeStartWorker.call();
135    }
136    if (avoidTestKillDuringRestart) {
137      procExecutor.testing = testing;
138    }
139    if (startWorkers) {
140      procExecutor.startWorkers();
141    }
142    if (startAction != null) {
143      startAction.call();
144    }
145  }
146
147  public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
148    throws Exception {
149    storeRestart(procStore, false, loader);
150  }
151
152  public static void storeRestart(ProcedureStore procStore, boolean abort,
153    ProcedureStore.ProcedureLoader loader) throws Exception {
154    procStore.stop(abort);
155    procStore.start(procStore.getNumThreads());
156    procStore.recoverLease();
157    procStore.load(loader);
158  }
159
160  public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
161    long runnableCount, int completedCount, int corruptedCount) throws Exception {
162    final LoadCounter loader = new LoadCounter();
163    storeRestart(procStore, loader);
164    assertEquals(maxProcId, loader.getMaxProcId());
165    assertEquals(runnableCount, loader.getRunnableCount());
166    assertEquals(completedCount, loader.getCompletedCount());
167    assertEquals(corruptedCount, loader.getCorruptedCount());
168    return loader;
169  }
170
171  private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) {
172    if (procExecutor.testing == null) {
173      procExecutor.testing = new ProcedureExecutor.Testing();
174    }
175  }
176
177  public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor,
178    boolean value) {
179    createExecutorTesting(procExecutor);
180    procExecutor.testing.killIfHasParent = value;
181  }
182
183  public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
184    boolean value) {
185    createExecutorTesting(procExecutor);
186    procExecutor.testing.killIfSuspended = value;
187  }
188
189  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
190    boolean value) {
191    createExecutorTesting(procExecutor);
192    procExecutor.testing.killBeforeStoreUpdate = value;
193    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
194    assertSingleExecutorForKillTests(procExecutor);
195  }
196
197  public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
198    boolean value) {
199    createExecutorTesting(procExecutor);
200    procExecutor.testing.toggleKillBeforeStoreUpdate = value;
201    assertSingleExecutorForKillTests(procExecutor);
202  }
203
204  public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
205    createExecutorTesting(procExecutor);
206    procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
207    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
208    assertSingleExecutorForKillTests(procExecutor);
209  }
210
211  public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
212    createExecutorTesting(procExecutor);
213    procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate;
214    LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate);
215    assertSingleExecutorForKillTests(procExecutor);
216  }
217
218  public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
219    boolean value) {
220    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
221    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
222    assertSingleExecutorForKillTests(procExecutor);
223  }
224
225  private static <TEnv> void
226    assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) {
227    if (procExecutor.testing == null) {
228      return;
229    }
230
231    if (
232      procExecutor.testing.killBeforeStoreUpdate || procExecutor.testing.toggleKillBeforeStoreUpdate
233    ) {
234      assertEquals("expected only one executor running during test with kill/restart", 1,
235        procExecutor.getCorePoolSize());
236    }
237  }
238
239  public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
240    throws IOException {
241    NoopProcedureStore procStore = new NoopProcedureStore();
242    ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
243    procStore.start(1);
244    initAndStartWorkers(procExecutor, 1, false, true);
245    try {
246      return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
247    } finally {
248      procStore.stop(false);
249      procExecutor.stop();
250    }
251  }
252
253  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
254    return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
255  }
256
257  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
258    final long nonceGroup, final long nonce) {
259    long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
260    waitProcedure(procExecutor, procId);
261    return procId;
262  }
263
264  public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
265    final long nonceGroup, final long nonce) {
266    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
267    long procId = procExecutor.registerNonce(nonceKey);
268    assertFalse(procId >= 0);
269    return procExecutor.submitProcedure(proc, nonceKey);
270  }
271
272  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
273    while (proc.getState() == ProcedureState.INITIALIZING) {
274      Threads.sleepWithoutInterrupt(250);
275    }
276    waitProcedure(procExecutor, proc.getProcId());
277  }
278
279  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
280    while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
281      Threads.sleepWithoutInterrupt(250);
282    }
283  }
284
285  public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) {
286    for (int i = 0; i < procIds.length; ++i) {
287      waitProcedure(procExecutor, procIds[i]);
288    }
289  }
290
291  public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) {
292    for (long procId : procExecutor.getActiveProcIds()) {
293      waitProcedure(procExecutor, procId);
294    }
295  }
296
297  public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
298    int stableRuns = 0;
299    while (stableRuns < 10) {
300      if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) {
301        stableRuns = 0;
302        Threads.sleepWithoutInterrupt(100);
303      } else {
304        stableRuns++;
305        Threads.sleepWithoutInterrupt(25);
306      }
307    }
308  }
309
310  public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
311    long procId) {
312    assertFalse("expected a running proc", procExecutor.isFinished(procId));
313    assertEquals(null, procExecutor.getResult(procId));
314  }
315
316  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) {
317    Procedure<?> result = procExecutor.getResult(procId);
318    assertTrue("expected procedure result", result != null);
319    assertProcNotFailed(result);
320  }
321
322  public static void assertProcNotFailed(final Procedure<?> result) {
323    assertFalse("found exception: " + result.getException(), result.isFailed());
324  }
325
326  public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor,
327    final long procId) {
328    Procedure<?> result = procExecutor.getResult(procId);
329    assertTrue("expected procedure result", result != null);
330    return assertProcFailed(result);
331  }
332
333  public static Throwable assertProcFailed(final Procedure<?> result) {
334    assertEquals(true, result.isFailed());
335    LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage());
336    return getExceptionCause(result);
337  }
338
339  public static void assertIsAbortException(final Procedure<?> result) {
340    Throwable cause = assertProcFailed(result);
341    assertTrue("expected abort exception, got " + cause,
342      cause instanceof ProcedureAbortedException);
343  }
344
345  public static void assertIsTimeoutException(final Procedure<?> result) {
346    Throwable cause = assertProcFailed(result);
347    assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
348  }
349
350  public static void assertIsIllegalArgumentException(final Procedure<?> result) {
351    Throwable cause = assertProcFailed(result);
352    assertTrue("expected IllegalArgumentIOException, got " + cause,
353      cause instanceof IllegalArgumentIOException);
354  }
355
356  public static Throwable getExceptionCause(final Procedure<?> procInfo) {
357    assert procInfo.isFailed();
358    Throwable cause = procInfo.getException().getCause();
359    return cause == null ? procInfo.getException() : cause;
360  }
361
362  /**
363   * Run through all procedure flow states TWICE while also restarting procedure executor at each
364   * step; i.e force a reread of procedure store.
365   * <p>
366   * It does
367   * <ol>
368   * <li>Execute step N - kill the executor before store update
369   * <li>Restart executor/store
370   * <li>Execute step N - and then save to store
371   * </ol>
372   * <p>
373   * This is a good test for finding state that needs persisting and steps that are not idempotent.
374   */
375  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
376    final long procId) throws Exception {
377    testRecoveryAndDoubleExecution(procExec, procId, false);
378  }
379
380  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
381    final long procId, final boolean expectFailure) throws Exception {
382    testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null);
383  }
384
385  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
386    final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception {
387    Procedure proc = procExec.getProcedure(procId);
388    waitProcedure(procExec, procId);
389    assertEquals(false, procExec.isRunning());
390    for (int i = 0; !procExec.isFinished(procId); ++i) {
391      proc = procExec.getProcedure(procId);
392      LOG.info("Restart " + i + " exec state: " + proc);
393      if (customRestart != null) {
394        customRestart.run();
395      } else {
396        restart(procExec);
397      }
398      waitProcedure(procExec, procId);
399    }
400
401    assertEquals(true, procExec.isRunning());
402    if (expectFailure) {
403      assertProcFailed(procExec, procId);
404    } else {
405      assertProcNotFailed(procExec, procId);
406    }
407  }
408
409  public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
410    public NoopProcedure() {
411    }
412
413    @Override
414    protected Procedure<TEnv>[] execute(TEnv env)
415      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
416      return null;
417    }
418
419    @Override
420    protected void rollback(TEnv env) throws IOException, InterruptedException {
421    }
422
423    @Override
424    protected boolean abort(TEnv env) {
425      return false;
426    }
427
428    @Override
429    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
430    }
431
432    @Override
433    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
434    }
435  }
436
437  public static class NoopStateMachineProcedure<TEnv, TState>
438    extends StateMachineProcedure<TEnv, TState> {
439    private TState initialState;
440    private TEnv env;
441
442    public NoopStateMachineProcedure() {
443    }
444
445    public NoopStateMachineProcedure(TEnv env, TState initialState) {
446      this.env = env;
447      this.initialState = initialState;
448    }
449
450    @Override
451    protected Flow executeFromState(TEnv env, TState tState)
452      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
453      return null;
454    }
455
456    @Override
457    protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException {
458
459    }
460
461    @Override
462    protected TState getState(int stateId) {
463      return null;
464    }
465
466    @Override
467    protected int getStateId(TState tState) {
468      return 0;
469    }
470
471    @Override
472    protected TState getInitialState() {
473      return initialState;
474    }
475  }
476
477  public static class TestProcedure extends NoopProcedure<Void> {
478    private byte[] data = null;
479
480    public TestProcedure() {
481    }
482
483    public TestProcedure(long procId) {
484      this(procId, 0);
485    }
486
487    public TestProcedure(long procId, long parentId) {
488      this(procId, parentId, null);
489    }
490
491    public TestProcedure(long procId, long parentId, byte[] data) {
492      this(procId, parentId, parentId, data);
493    }
494
495    public TestProcedure(long procId, long parentId, long rootId, byte[] data) {
496      setData(data);
497      setProcId(procId);
498      if (parentId > 0) {
499        setParentProcId(parentId);
500      }
501      if (rootId > 0 || parentId > 0) {
502        setRootProcId(rootId);
503      }
504    }
505
506    public void addStackId(final int index) {
507      addStackIndex(index);
508    }
509
510    public void setSuccessState() {
511      setState(ProcedureState.SUCCESS);
512    }
513
514    public void setData(final byte[] data) {
515      this.data = data;
516    }
517
518    @Override
519    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
520      ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
521      BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
522      serializer.serialize(builder.build());
523    }
524
525    @Override
526    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
527      BytesValue bytesValue = serializer.deserialize(BytesValue.class);
528      ByteString dataString = bytesValue.getValue();
529
530      if (dataString.isEmpty()) {
531        data = null;
532      } else {
533        data = dataString.toByteArray();
534      }
535    }
536
537    // Mark acquire/release lock functions public for test uses.
538    @Override
539    public LockState acquireLock(Void env) {
540      return LockState.LOCK_ACQUIRED;
541    }
542
543    @Override
544    public void releaseLock(Void env) {
545      // no-op
546    }
547  }
548
549  public static class LoadCounter implements ProcedureStore.ProcedureLoader {
550    private final ArrayList<Procedure> corrupted = new ArrayList<>();
551    private final ArrayList<Procedure> completed = new ArrayList<>();
552    private final ArrayList<Procedure> runnable = new ArrayList<>();
553
554    private Set<Long> procIds;
555    private long maxProcId = 0;
556
557    public LoadCounter() {
558      this(null);
559    }
560
561    public LoadCounter(final Set<Long> procIds) {
562      this.procIds = procIds;
563    }
564
565    public void reset() {
566      reset(null);
567    }
568
569    public void reset(final Set<Long> procIds) {
570      corrupted.clear();
571      completed.clear();
572      runnable.clear();
573      this.procIds = procIds;
574      this.maxProcId = 0;
575    }
576
577    public long getMaxProcId() {
578      return maxProcId;
579    }
580
581    public ArrayList<Procedure> getRunnables() {
582      return runnable;
583    }
584
585    public int getRunnableCount() {
586      return runnable.size();
587    }
588
589    public ArrayList<Procedure> getCompleted() {
590      return completed;
591    }
592
593    public int getCompletedCount() {
594      return completed.size();
595    }
596
597    public int getLoadedCount() {
598      return runnable.size() + completed.size();
599    }
600
601    public ArrayList<Procedure> getCorrupted() {
602      return corrupted;
603    }
604
605    public int getCorruptedCount() {
606      return corrupted.size();
607    }
608
609    public boolean isRunnable(final long procId) {
610      for (Procedure proc : runnable) {
611        if (proc.getProcId() == procId) {
612          return true;
613        }
614      }
615      return false;
616    }
617
618    @Override
619    public void setMaxProcId(long maxProcId) {
620      this.maxProcId = maxProcId;
621    }
622
623    @Override
624    public void load(ProcedureIterator procIter) throws IOException {
625      while (procIter.hasNext()) {
626        long procId;
627        if (procIter.isNextFinished()) {
628          Procedure<?> proc = procIter.next();
629          procId = proc.getProcId();
630          LOG.debug("loading completed procId=" + procId + ": " + proc);
631          completed.add(proc);
632        } else {
633          Procedure proc = procIter.next();
634          procId = proc.getProcId();
635          LOG.debug("loading runnable procId=" + procId + ": " + proc);
636          runnable.add(proc);
637        }
638        if (procIds != null) {
639          assertTrue("procId=" + procId + " unexpected", procIds.contains(procId));
640        }
641      }
642    }
643
644    @Override
645    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
646      while (procIter.hasNext()) {
647        Procedure proc = procIter.next();
648        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
649        corrupted.add(proc);
650      }
651    }
652  }
653}