001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
033import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
034import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
035import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
037import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
038import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
039import org.apache.hadoop.hbase.util.NonceKey;
040import org.apache.hadoop.hbase.util.Threads;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
045import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
048
049public final class ProcedureTestingUtility {
050  private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class);
051
052  private ProcedureTestingUtility() {
053  }
054
055  public static ProcedureStore createStore(final Configuration conf, final Path dir)
056    throws IOException {
057    return createWalStore(conf, dir);
058  }
059
060  public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
061    throws IOException {
062    return new WALProcedureStore(conf, dir, null, new LeaseRecovery() {
063      @Override
064      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
065        // no-op
066      }
067    });
068  }
069
070  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort,
071    boolean startWorkers) throws Exception {
072    restart(procExecutor, false, true, null, null, null, abort, startWorkers);
073  }
074
075  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort)
076    throws Exception {
077    restart(procExecutor, false, true, null, null, null, abort, true);
078  }
079
080  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
081    restart(procExecutor, false, true, null, null, null, false, true);
082  }
083
084  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
085    boolean abortOnCorruption) throws IOException {
086    initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
087  }
088
089  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
090    boolean abortOnCorruption, boolean startWorkers) throws IOException {
091    procExecutor.init(numThreads, abortOnCorruption);
092    if (startWorkers) {
093      procExecutor.startWorkers();
094    }
095  }
096
097  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
098    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
099    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
100    restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction,
101      actionBeforeStartWorker, startAction, false, true);
102  }
103
104  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
105    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
106    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
107    boolean startWorkers) throws Exception {
108    final ProcedureStore procStore = procExecutor.getStore();
109    final int storeThreads = procExecutor.getCorePoolSize();
110    final int execThreads = procExecutor.getCorePoolSize();
111
112    final ProcedureExecutor.Testing testing = procExecutor.testing;
113    if (avoidTestKillDuringRestart) {
114      procExecutor.testing = null;
115    }
116
117    // stop
118    LOG.info("RESTART - Stop");
119    procExecutor.stop();
120    procStore.stop(abort);
121    if (stopAction != null) {
122      stopAction.call();
123    }
124    procExecutor.join();
125    procExecutor.getScheduler().clear();
126
127    // nothing running...
128
129    // re-start
130    LOG.info("RESTART - Start");
131    procStore.start(storeThreads);
132    procExecutor.init(execThreads, failOnCorrupted);
133    if (actionBeforeStartWorker != null) {
134      actionBeforeStartWorker.call();
135    }
136    if (avoidTestKillDuringRestart) {
137      procExecutor.testing = testing;
138    }
139    if (startWorkers) {
140      procExecutor.startWorkers();
141    }
142    if (startAction != null) {
143      startAction.call();
144    }
145  }
146
147  public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
148    throws Exception {
149    storeRestart(procStore, false, loader);
150  }
151
152  public static void storeRestart(ProcedureStore procStore, boolean abort,
153    ProcedureStore.ProcedureLoader loader) throws Exception {
154    procStore.stop(abort);
155    procStore.start(procStore.getNumThreads());
156    procStore.recoverLease();
157    procStore.load(loader);
158  }
159
160  public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
161    long runnableCount, int completedCount, int corruptedCount) throws Exception {
162    final LoadCounter loader = new LoadCounter();
163    storeRestart(procStore, loader);
164    assertEquals(maxProcId, loader.getMaxProcId());
165    assertEquals(runnableCount, loader.getRunnableCount());
166    assertEquals(completedCount, loader.getCompletedCount());
167    assertEquals(corruptedCount, loader.getCorruptedCount());
168    return loader;
169  }
170
171  private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) {
172    if (procExecutor.testing == null) {
173      procExecutor.testing = new ProcedureExecutor.Testing();
174    }
175  }
176
177  public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor,
178    boolean value) {
179    createExecutorTesting(procExecutor);
180    procExecutor.testing.killIfHasParent = value;
181  }
182
183  public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
184    boolean value) {
185    createExecutorTesting(procExecutor);
186    procExecutor.testing.killIfSuspended = value;
187  }
188
189  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
190    boolean value) {
191    createExecutorTesting(procExecutor);
192    procExecutor.testing.killBeforeStoreUpdate = value;
193    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
194    assertSingleExecutorForKillTests(procExecutor);
195  }
196
197  public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
198    boolean value) {
199    createExecutorTesting(procExecutor);
200    procExecutor.testing.toggleKillBeforeStoreUpdate = value;
201    assertSingleExecutorForKillTests(procExecutor);
202  }
203
204  public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
205    createExecutorTesting(procExecutor);
206    procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
207    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
208    assertSingleExecutorForKillTests(procExecutor);
209  }
210
211  public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
212    createExecutorTesting(procExecutor);
213    procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate;
214    LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate);
215    assertSingleExecutorForKillTests(procExecutor);
216  }
217
218  public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
219    boolean value) {
220    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
221    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
222    assertSingleExecutorForKillTests(procExecutor);
223  }
224
225  private static <TEnv> void
226    assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) {
227    if (procExecutor.testing == null) {
228      return;
229    }
230
231    if (procExecutor.testing.killBeforeStoreUpdate ||
232      procExecutor.testing.toggleKillBeforeStoreUpdate) {
233      assertEquals("expected only one executor running during test with kill/restart", 1,
234        procExecutor.getCorePoolSize());
235    }
236  }
237
238  public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
239    throws IOException {
240    NoopProcedureStore procStore = new NoopProcedureStore();
241    ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
242    procStore.start(1);
243    initAndStartWorkers(procExecutor, 1, false, true);
244    try {
245      return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
246    } finally {
247      procStore.stop(false);
248      procExecutor.stop();
249    }
250  }
251
252  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
253    return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
254  }
255
256  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
257    final long nonceGroup, final long nonce) {
258    long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
259    waitProcedure(procExecutor, procId);
260    return procId;
261  }
262
263  public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
264    final long nonceGroup, final long nonce) {
265    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
266    long procId = procExecutor.registerNonce(nonceKey);
267    assertFalse(procId >= 0);
268    return procExecutor.submitProcedure(proc, nonceKey);
269  }
270
271  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
272    while (proc.getState() == ProcedureState.INITIALIZING) {
273      Threads.sleepWithoutInterrupt(250);
274    }
275    waitProcedure(procExecutor, proc.getProcId());
276  }
277
278  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
279    while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
280      Threads.sleepWithoutInterrupt(250);
281    }
282  }
283
284  public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) {
285    for (int i = 0; i < procIds.length; ++i) {
286      waitProcedure(procExecutor, procIds[i]);
287    }
288  }
289
290  public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) {
291    for (long procId : procExecutor.getActiveProcIds()) {
292      waitProcedure(procExecutor, procId);
293    }
294  }
295
296  public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
297    int stableRuns = 0;
298    while (stableRuns < 10) {
299      if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) {
300        stableRuns = 0;
301        Threads.sleepWithoutInterrupt(100);
302      } else {
303        stableRuns++;
304        Threads.sleepWithoutInterrupt(25);
305      }
306    }
307  }
308
309  public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
310    long procId) {
311    assertFalse("expected a running proc", procExecutor.isFinished(procId));
312    assertEquals(null, procExecutor.getResult(procId));
313  }
314
315  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) {
316    Procedure<?> result = procExecutor.getResult(procId);
317    assertTrue("expected procedure result", result != null);
318    assertProcNotFailed(result);
319  }
320
321  public static void assertProcNotFailed(final Procedure<?> result) {
322    assertFalse("found exception: " + result.getException(), result.isFailed());
323  }
324
325  public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor,
326    final long procId) {
327    Procedure<?> result = procExecutor.getResult(procId);
328    assertTrue("expected procedure result", result != null);
329    return assertProcFailed(result);
330  }
331
332  public static Throwable assertProcFailed(final Procedure<?> result) {
333    assertEquals(true, result.isFailed());
334    LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage());
335    return getExceptionCause(result);
336  }
337
338  public static void assertIsAbortException(final Procedure<?> result) {
339    Throwable cause = assertProcFailed(result);
340    assertTrue("expected abort exception, got " + cause,
341      cause instanceof ProcedureAbortedException);
342  }
343
344  public static void assertIsTimeoutException(final Procedure<?> result) {
345    Throwable cause = assertProcFailed(result);
346    assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
347  }
348
349  public static void assertIsIllegalArgumentException(final Procedure<?> result) {
350    Throwable cause = assertProcFailed(result);
351    assertTrue("expected IllegalArgumentIOException, got " + cause,
352      cause instanceof IllegalArgumentIOException);
353  }
354
355  public static Throwable getExceptionCause(final Procedure<?> procInfo) {
356    assert procInfo.isFailed();
357    Throwable cause = procInfo.getException().getCause();
358    return cause == null ? procInfo.getException() : cause;
359  }
360
361  /**
362   * Run through all procedure flow states TWICE while also restarting procedure executor at each
363   * step; i.e force a reread of procedure store.
364   * <p>
365   * It does
366   * <ol>
367   * <li>Execute step N - kill the executor before store update
368   * <li>Restart executor/store
369   * <li>Execute step N - and then save to store
370   * </ol>
371   * <p>
372   * This is a good test for finding state that needs persisting and steps that are not idempotent.
373   */
374  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
375    final long procId) throws Exception {
376    testRecoveryAndDoubleExecution(procExec, procId, false);
377  }
378
379  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
380    final long procId, final boolean expectFailure) throws Exception {
381    testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null);
382  }
383
384  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
385    final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception {
386    Procedure proc = procExec.getProcedure(procId);
387    waitProcedure(procExec, procId);
388    assertEquals(false, procExec.isRunning());
389    for (int i = 0; !procExec.isFinished(procId); ++i) {
390      proc = procExec.getProcedure(procId);
391      LOG.info("Restart " + i + " exec state: " + proc);
392      if (customRestart != null) {
393        customRestart.run();
394      } else {
395        restart(procExec);
396      }
397      waitProcedure(procExec, procId);
398    }
399
400    assertEquals(true, procExec.isRunning());
401    if (expectFailure) {
402      assertProcFailed(procExec, procId);
403    } else {
404      assertProcNotFailed(procExec, procId);
405    }
406  }
407
408  public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
409    public NoopProcedure() {
410    }
411
412    @Override
413    protected Procedure<TEnv>[] execute(TEnv env)
414      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
415      return null;
416    }
417
418    @Override
419    protected void rollback(TEnv env) throws IOException, InterruptedException {
420    }
421
422    @Override
423    protected boolean abort(TEnv env) {
424      return false;
425    }
426
427    @Override
428    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
429    }
430
431    @Override
432    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
433    }
434  }
435
436  public static class NoopStateMachineProcedure<TEnv, TState>
437    extends StateMachineProcedure<TEnv, TState> {
438    private TState initialState;
439    private TEnv env;
440
441    public NoopStateMachineProcedure() {
442    }
443
444    public NoopStateMachineProcedure(TEnv env, TState initialState) {
445      this.env = env;
446      this.initialState = initialState;
447    }
448
449    @Override
450    protected Flow executeFromState(TEnv env, TState tState)
451      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
452      return null;
453    }
454
455    @Override
456    protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException {
457
458    }
459
460    @Override
461    protected TState getState(int stateId) {
462      return null;
463    }
464
465    @Override
466    protected int getStateId(TState tState) {
467      return 0;
468    }
469
470    @Override
471    protected TState getInitialState() {
472      return initialState;
473    }
474  }
475
476  public static class TestProcedure extends NoopProcedure<Void> {
477    private byte[] data = null;
478
479    public TestProcedure() {
480    }
481
482    public TestProcedure(long procId) {
483      this(procId, 0);
484    }
485
486    public TestProcedure(long procId, long parentId) {
487      this(procId, parentId, null);
488    }
489
490    public TestProcedure(long procId, long parentId, byte[] data) {
491      this(procId, parentId, parentId, data);
492    }
493
494    public TestProcedure(long procId, long parentId, long rootId, byte[] data) {
495      setData(data);
496      setProcId(procId);
497      if (parentId > 0) {
498        setParentProcId(parentId);
499      }
500      if (rootId > 0 || parentId > 0) {
501        setRootProcId(rootId);
502      }
503    }
504
505    public void addStackId(final int index) {
506      addStackIndex(index);
507    }
508
509    public void setSuccessState() {
510      setState(ProcedureState.SUCCESS);
511    }
512
513    public void setData(final byte[] data) {
514      this.data = data;
515    }
516
517    @Override
518    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
519      ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
520      BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
521      serializer.serialize(builder.build());
522    }
523
524    @Override
525    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
526      BytesValue bytesValue = serializer.deserialize(BytesValue.class);
527      ByteString dataString = bytesValue.getValue();
528
529      if (dataString.isEmpty()) {
530        data = null;
531      } else {
532        data = dataString.toByteArray();
533      }
534    }
535
536    // Mark acquire/release lock functions public for test uses.
537    @Override
538    public LockState acquireLock(Void env) {
539      return LockState.LOCK_ACQUIRED;
540    }
541
542    @Override
543    public void releaseLock(Void env) {
544      // no-op
545    }
546  }
547
548  public static class LoadCounter implements ProcedureStore.ProcedureLoader {
549    private final ArrayList<Procedure> corrupted = new ArrayList<>();
550    private final ArrayList<Procedure> completed = new ArrayList<>();
551    private final ArrayList<Procedure> runnable = new ArrayList<>();
552
553    private Set<Long> procIds;
554    private long maxProcId = 0;
555
556    public LoadCounter() {
557      this(null);
558    }
559
560    public LoadCounter(final Set<Long> procIds) {
561      this.procIds = procIds;
562    }
563
564    public void reset() {
565      reset(null);
566    }
567
568    public void reset(final Set<Long> procIds) {
569      corrupted.clear();
570      completed.clear();
571      runnable.clear();
572      this.procIds = procIds;
573      this.maxProcId = 0;
574    }
575
576    public long getMaxProcId() {
577      return maxProcId;
578    }
579
580    public ArrayList<Procedure> getRunnables() {
581      return runnable;
582    }
583
584    public int getRunnableCount() {
585      return runnable.size();
586    }
587
588    public ArrayList<Procedure> getCompleted() {
589      return completed;
590    }
591
592    public int getCompletedCount() {
593      return completed.size();
594    }
595
596    public int getLoadedCount() {
597      return runnable.size() + completed.size();
598    }
599
600    public ArrayList<Procedure> getCorrupted() {
601      return corrupted;
602    }
603
604    public int getCorruptedCount() {
605      return corrupted.size();
606    }
607
608    public boolean isRunnable(final long procId) {
609      for (Procedure proc : runnable) {
610        if (proc.getProcId() == procId) {
611          return true;
612        }
613      }
614      return false;
615    }
616
617    @Override
618    public void setMaxProcId(long maxProcId) {
619      this.maxProcId = maxProcId;
620    }
621
622    @Override
623    public void load(ProcedureIterator procIter) throws IOException {
624      while (procIter.hasNext()) {
625        long procId;
626        if (procIter.isNextFinished()) {
627          Procedure<?> proc = procIter.next();
628          procId = proc.getProcId();
629          LOG.debug("loading completed procId=" + procId + ": " + proc);
630          completed.add(proc);
631        } else {
632          Procedure proc = procIter.next();
633          procId = proc.getProcId();
634          LOG.debug("loading runnable procId=" + procId + ": " + proc);
635          runnable.add(proc);
636        }
637        if (procIds != null) {
638          assertTrue("procId=" + procId + " unexpected", procIds.contains(procId));
639        }
640      }
641    }
642
643    @Override
644    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
645      while (procIter.hasNext()) {
646        Procedure proc = procIter.next();
647        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
648        corrupted.add(proc);
649      }
650    }
651  }
652}