001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Set;
028import java.util.concurrent.Callable;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
035import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
036import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
037import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
038import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
039import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
040import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
041import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
043import org.apache.hadoop.hbase.util.NonceKey;
044import org.apache.hadoop.hbase.util.Threads;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048public class ProcedureTestingUtility {
049  private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class);
050
051  private ProcedureTestingUtility() {
052  }
053
054  public static ProcedureStore createStore(final Configuration conf, final Path dir)
055      throws IOException {
056    return createWalStore(conf, dir);
057  }
058
059  public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
060      throws IOException {
061    return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
062      @Override
063      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
064        // no-op
065      }
066    });
067  }
068
069  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
070      boolean abort, boolean startWorkers) throws Exception {
071    restart(procExecutor, false, true, null, null, abort, startWorkers);
072  }
073
074  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
075      boolean abort) throws Exception {
076    restart(procExecutor, false, true, null, null, abort, true);
077  }
078
079  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
080    restart(procExecutor, false, true, null, null, false, true);
081  }
082
083  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
084      boolean abortOnCorruption) throws IOException {
085    initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
086  }
087
088  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
089      boolean abortOnCorruption, boolean startWorkers) throws IOException {
090    initAndStartWorkers(procExecutor, numThreads, 0, abortOnCorruption, startWorkers);
091  }
092
093  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
094      int numUrgentThreads, boolean abortOnCorruption, boolean startWorkers) throws IOException {
095    procExecutor.init(numThreads, numUrgentThreads, abortOnCorruption);
096    if (startWorkers) {
097      procExecutor.startWorkers();
098    }
099  }
100
101  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
102      final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted,
103      final Callable<Void> stopAction, final Callable<Void> startAction)
104      throws Exception {
105    restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted,
106        stopAction, startAction, false, true);
107  }
108
109  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
110      final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted,
111      final Callable<Void> stopAction, final Callable<Void> startAction,
112      boolean abort, boolean startWorkers)
113      throws Exception {
114    final ProcedureStore procStore = procExecutor.getStore();
115    final int storeThreads = procExecutor.getCorePoolSize();
116    final int execThreads = procExecutor.getCorePoolSize();
117    final int urgentThreads = procExecutor.getUrgentPoolSize();
118
119    final ProcedureExecutor.Testing testing = procExecutor.testing;
120    if (avoidTestKillDuringRestart) {
121      procExecutor.testing = null;
122    }
123
124    // stop
125    LOG.info("RESTART - Stop");
126    procExecutor.stop();
127    procStore.stop(abort);
128    if (stopAction != null) {
129      stopAction.call();
130    }
131    procExecutor.join();
132    procExecutor.getScheduler().clear();
133
134    // nothing running...
135
136    // re-start
137    LOG.info("RESTART - Start");
138    procStore.start(storeThreads);
139    initAndStartWorkers(procExecutor, execThreads, urgentThreads, failOnCorrupted, startWorkers);
140    if (startAction != null) {
141      startAction.call();
142    }
143
144    if (avoidTestKillDuringRestart) {
145      procExecutor.testing = testing;
146    }
147  }
148
149  public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
150      throws Exception {
151    procStore.stop(false);
152    procStore.start(procStore.getNumThreads());
153    procStore.recoverLease();
154    procStore.load(loader);
155  }
156
157  public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
158      long runnableCount, int completedCount, int corruptedCount) throws Exception {
159    final LoadCounter loader = new LoadCounter();
160    storeRestart(procStore, loader);
161    assertEquals(maxProcId, loader.getMaxProcId());
162    assertEquals(runnableCount, loader.getRunnableCount());
163    assertEquals(completedCount, loader.getCompletedCount());
164    assertEquals(corruptedCount, loader.getCorruptedCount());
165    return loader;
166  }
167
168  private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) {
169    if (procExecutor.testing == null) {
170      procExecutor.testing = new ProcedureExecutor.Testing();
171    }
172  }
173
174  public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
175      boolean value) {
176    createExecutorTesting(procExecutor);
177    procExecutor.testing.killIfSuspended = value;
178  }
179
180  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
181      boolean value) {
182    createExecutorTesting(procExecutor);
183    procExecutor.testing.killBeforeStoreUpdate = value;
184    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
185    assertSingleExecutorForKillTests(procExecutor);
186  }
187
188  public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
189      boolean value) {
190    createExecutorTesting(procExecutor);
191    procExecutor.testing.toggleKillBeforeStoreUpdate = value;
192    assertSingleExecutorForKillTests(procExecutor);
193  }
194
195  public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
196    createExecutorTesting(procExecutor);
197    procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
198    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
199    assertSingleExecutorForKillTests(procExecutor);
200  }
201
202  public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
203    createExecutorTesting(procExecutor);
204    procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate;
205    LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate);
206    assertSingleExecutorForKillTests(procExecutor);
207  }
208
209  public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
210      boolean value) {
211    ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
212    ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
213    assertSingleExecutorForKillTests(procExecutor);
214  }
215
216  private static <TEnv> void assertSingleExecutorForKillTests(
217      final ProcedureExecutor<TEnv> procExecutor) {
218    if (procExecutor.testing == null) return;
219    if (procExecutor.testing.killBeforeStoreUpdate ||
220        procExecutor.testing.toggleKillBeforeStoreUpdate) {
221      assertEquals("expected only one executor running during test with kill/restart",
222        1, procExecutor.getCorePoolSize());
223    }
224  }
225
226  public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
227      throws IOException {
228    NoopProcedureStore procStore = new NoopProcedureStore();
229    ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
230    procStore.start(1);
231    initAndStartWorkers(procExecutor, 1, false, true);
232    try {
233      return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
234    } finally {
235      procStore.stop(false);
236      procExecutor.stop();
237    }
238  }
239
240  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
241    return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
242  }
243
244  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
245      final long nonceGroup, final long nonce) {
246    long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
247    waitProcedure(procExecutor, procId);
248    return procId;
249  }
250
251  public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
252      final long nonceGroup, final long nonce) {
253    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
254    long procId = procExecutor.registerNonce(nonceKey);
255    assertFalse(procId >= 0);
256    return procExecutor.submitProcedure(proc, nonceKey);
257  }
258
259  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
260    while (proc.getState() == ProcedureState.INITIALIZING) {
261      Threads.sleepWithoutInterrupt(250);
262    }
263    waitProcedure(procExecutor, proc.getProcId());
264  }
265
266  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
267    while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
268      Threads.sleepWithoutInterrupt(250);
269    }
270  }
271
272  public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) {
273    for (int i = 0; i < procIds.length; ++i) {
274      waitProcedure(procExecutor, procIds[i]);
275    }
276  }
277
278  public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) {
279    for (long procId : procExecutor.getActiveProcIds()) {
280      waitProcedure(procExecutor, procId);
281    }
282  }
283
284  public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
285    int stableRuns = 0;
286    while (stableRuns < 10) {
287      if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) {
288        stableRuns = 0;
289        Threads.sleepWithoutInterrupt(100);
290      } else {
291        stableRuns++;
292        Threads.sleepWithoutInterrupt(25);
293      }
294    }
295  }
296
297  public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
298      long procId) {
299    assertFalse("expected a running proc", procExecutor.isFinished(procId));
300    assertEquals(null, procExecutor.getResult(procId));
301  }
302
303  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
304      long procId) {
305    Procedure<?> result = procExecutor.getResult(procId);
306    assertTrue("expected procedure result", result != null);
307    assertProcNotFailed(result);
308  }
309
310  public static void assertProcNotFailed(final Procedure<?> result) {
311    assertFalse("found exception: " + result.getException(), result.isFailed());
312  }
313
314  public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor,
315      final long procId) {
316    Procedure<?> result = procExecutor.getResult(procId);
317    assertTrue("expected procedure result", result != null);
318    return assertProcFailed(result);
319  }
320
321  public static Throwable assertProcFailed(final Procedure<?> result) {
322    assertEquals(true, result.isFailed());
323    LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage());
324    return getExceptionCause(result);
325  }
326
327  public static void assertIsAbortException(final Procedure<?> result) {
328    Throwable cause = assertProcFailed(result);
329    assertTrue("expected abort exception, got "+ cause, cause instanceof ProcedureAbortedException);
330  }
331
332  public static void assertIsTimeoutException(final Procedure<?> result) {
333    Throwable cause = assertProcFailed(result);
334    assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
335  }
336
337  public static void assertIsIllegalArgumentException(final Procedure<?> result) {
338    Throwable cause = assertProcFailed(result);
339    assertTrue("expected IllegalArgumentIOException, got " + cause,
340      cause instanceof IllegalArgumentIOException);
341  }
342
343  public static Throwable getExceptionCause(final Procedure<?> procInfo) {
344    assert procInfo.isFailed();
345    Throwable cause = procInfo.getException().getCause();
346    return cause == null ? procInfo.getException() : cause;
347  }
348
349  /**
350   * Run through all procedure flow states TWICE while also restarting
351   * procedure executor at each step; i.e force a reread of procedure store.
352   *
353   *<p>It does
354   * <ol><li>Execute step N - kill the executor before store update
355   * <li>Restart executor/store
356   * <li>Execute step N - and then save to store
357   * </ol>
358   *
359   *<p>This is a good test for finding state that needs persisting and steps that are not
360   * idempotent.
361   */
362  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
363      final long procId) throws Exception {
364    testRecoveryAndDoubleExecution(procExec, procId, false);
365  }
366
367  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
368      final long procId, final boolean expectFailure) throws Exception {
369    testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null);
370  }
371
372  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
373      final long procId, final boolean expectFailure, final Runnable customRestart)
374      throws Exception {
375    Procedure proc = procExec.getProcedure(procId);
376    waitProcedure(procExec, procId);
377    assertEquals(false, procExec.isRunning());
378    for (int i = 0; !procExec.isFinished(procId); ++i) {
379      proc = procExec.getProcedure(procId);
380      LOG.info("Restart " + i + " exec state: " + proc);
381      if (customRestart != null) {
382        customRestart.run();
383      } else {
384        restart(procExec);
385      }
386      waitProcedure(procExec, procId);
387    }
388
389    assertEquals(true, procExec.isRunning());
390    if (expectFailure) {
391      assertProcFailed(procExec, procId);
392    } else {
393      assertProcNotFailed(procExec, procId);
394    }
395  }
396
397  public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
398    public NoopProcedure() {}
399
400    @Override
401    protected Procedure<TEnv>[] execute(TEnv env)
402        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
403      return null;
404    }
405
406    @Override
407    protected void rollback(TEnv env) throws IOException, InterruptedException {
408    }
409
410    @Override
411    protected boolean abort(TEnv env) { return false; }
412
413    @Override
414    protected void serializeStateData(ProcedureStateSerializer serializer)
415        throws IOException {
416    }
417
418    @Override
419    protected void deserializeStateData(ProcedureStateSerializer serializer)
420        throws IOException {
421    }
422  }
423
424  public static class NoopStateMachineProcedure<TEnv, TState>
425      extends StateMachineProcedure<TEnv, TState> {
426    private TState initialState;
427    private TEnv env;
428
429    public NoopStateMachineProcedure() {
430    }
431
432    public NoopStateMachineProcedure(TEnv env, TState initialState) {
433      this.env = env;
434      this.initialState = initialState;
435    }
436
437    @Override
438    protected Flow executeFromState(TEnv env, TState tState)
439        throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
440      return null;
441    }
442
443    @Override
444    protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException {
445
446    }
447
448    @Override
449    protected TState getState(int stateId) {
450      return null;
451    }
452
453    @Override
454    protected int getStateId(TState tState) {
455      return 0;
456    }
457
458    @Override
459    protected TState getInitialState() {
460      return initialState;
461    }
462  }
463
464  public static class TestProcedure extends NoopProcedure<Void> {
465    private byte[] data = null;
466
467    public TestProcedure() {}
468
469    public TestProcedure(long procId) {
470      this(procId, 0);
471    }
472
473    public TestProcedure(long procId, long parentId) {
474      this(procId, parentId, null);
475    }
476
477    public TestProcedure(long procId, long parentId, byte[] data) {
478      this(procId, parentId, parentId, data);
479    }
480
481    public TestProcedure(long procId, long parentId, long rootId, byte[] data) {
482      setData(data);
483      setProcId(procId);
484      if (parentId > 0) {
485        setParentProcId(parentId);
486      }
487      if (rootId > 0 || parentId > 0) {
488        setRootProcId(rootId);
489      }
490    }
491
492    public void addStackId(final int index) {
493      addStackIndex(index);
494    }
495
496    public void setSuccessState() {
497      setState(ProcedureState.SUCCESS);
498    }
499
500    public void setData(final byte[] data) {
501      this.data = data;
502    }
503
504    @Override
505    protected void serializeStateData(ProcedureStateSerializer serializer)
506        throws IOException {
507      ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
508      BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
509      serializer.serialize(builder.build());
510    }
511
512    @Override
513    protected void deserializeStateData(ProcedureStateSerializer serializer)
514        throws IOException {
515      BytesValue bytesValue = serializer.deserialize(BytesValue.class);
516      ByteString dataString = bytesValue.getValue();
517
518      if (dataString.isEmpty()) {
519        data = null;
520      } else {
521        data = dataString.toByteArray();
522      }
523    }
524
525    // Mark acquire/release lock functions public for test uses.
526    @Override
527    public LockState acquireLock(Void env) {
528      return LockState.LOCK_ACQUIRED;
529    }
530
531    @Override
532    public void releaseLock(Void env) {
533      // no-op
534    }
535  }
536
537  public static class LoadCounter implements ProcedureStore.ProcedureLoader {
538    private final ArrayList<Procedure> corrupted = new ArrayList<>();
539    private final ArrayList<Procedure> completed = new ArrayList<>();
540    private final ArrayList<Procedure> runnable = new ArrayList<>();
541
542    private Set<Long> procIds;
543    private long maxProcId = 0;
544
545    public LoadCounter() {
546      this(null);
547    }
548
549    public LoadCounter(final Set<Long> procIds) {
550      this.procIds = procIds;
551    }
552
553    public void reset() {
554      reset(null);
555    }
556
557    public void reset(final Set<Long> procIds) {
558      corrupted.clear();
559      completed.clear();
560      runnable.clear();
561      this.procIds = procIds;
562      this.maxProcId = 0;
563    }
564
565    public long getMaxProcId() {
566      return maxProcId;
567    }
568
569    public ArrayList<Procedure> getRunnables() {
570      return runnable;
571    }
572
573    public int getRunnableCount() {
574      return runnable.size();
575    }
576
577    public ArrayList<Procedure> getCompleted() {
578      return completed;
579    }
580
581    public int getCompletedCount() {
582      return completed.size();
583    }
584
585    public int getLoadedCount() {
586      return runnable.size() + completed.size();
587    }
588
589    public ArrayList<Procedure> getCorrupted() {
590      return corrupted;
591    }
592
593    public int getCorruptedCount() {
594      return corrupted.size();
595    }
596
597    public boolean isRunnable(final long procId) {
598      for (Procedure proc: runnable) {
599        if (proc.getProcId() == procId) {
600          return true;
601        }
602      }
603      return false;
604    }
605
606    @Override
607    public void setMaxProcId(long maxProcId) {
608      this.maxProcId = maxProcId;
609    }
610
611    @Override
612    public void load(ProcedureIterator procIter) throws IOException {
613      while (procIter.hasNext()) {
614        long procId;
615        if (procIter.isNextFinished()) {
616          Procedure<?> proc = procIter.next();
617          procId = proc.getProcId();
618          LOG.debug("loading completed procId=" + procId + ": " + proc);
619          completed.add(proc);
620        } else {
621          Procedure proc = procIter.next();
622          procId = proc.getProcId();
623          LOG.debug("loading runnable procId=" + procId + ": " + proc);
624          runnable.add(proc);
625        }
626        if (procIds != null) {
627          assertTrue("procId=" + procId + " unexpected", procIds.contains(procId));
628        }
629      }
630    }
631
632    @Override
633    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
634      while (procIter.hasNext()) {
635        Procedure proc = procIter.next();
636        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
637        corrupted.add(proc);
638      }
639    }
640  }
641}