001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertInstanceOf;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertNull;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Set;
030import java.util.concurrent.Callable;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
036import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
037import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
038import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
039import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
040import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
041import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
042import org.apache.hadoop.hbase.util.NonceKey;
043import org.apache.hadoop.hbase.util.Threads;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
048import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
049
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
051
052public final class ProcedureTestingUtility {
053  private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class);
054
055  private ProcedureTestingUtility() {
056  }
057
058  public static ProcedureStore createStore(final Configuration conf, final Path dir)
059    throws IOException {
060    return createWalStore(conf, dir);
061  }
062
063  public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
064    throws IOException {
065    return new WALProcedureStore(conf, dir, null, new LeaseRecovery() {
066      @Override
067      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
068        // no-op
069      }
070    });
071  }
072
073  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort,
074    boolean startWorkers) throws Exception {
075    restart(procExecutor, false, true, null, null, null, abort, startWorkers);
076  }
077
078  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort)
079    throws Exception {
080    restart(procExecutor, false, true, null, null, null, abort, true);
081  }
082
083  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
084    restart(procExecutor, false, true, null, null, null, false, true);
085  }
086
087  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
088    boolean abortOnCorruption) throws IOException {
089    initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
090  }
091
092  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
093    boolean abortOnCorruption, boolean startWorkers) throws IOException {
094    procExecutor.init(numThreads, abortOnCorruption);
095    if (startWorkers) {
096      procExecutor.startWorkers();
097    }
098  }
099
100  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
101    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
102    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
103    restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction,
104      actionBeforeStartWorker, startAction, false, true);
105  }
106
107  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
108    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
109    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
110    boolean startWorkers) throws Exception {
111    final ProcedureStore procStore = procExecutor.getStore();
112    final int storeThreads = procExecutor.getCorePoolSize();
113    final int execThreads = procExecutor.getCorePoolSize();
114
115    final ProcedureExecutor.Testing testing = procExecutor.testing;
116    if (avoidTestKillDuringRestart) {
117      procExecutor.testing = null;
118    }
119
120    // stop
121    LOG.info("RESTART - Stop");
122    procExecutor.stop();
123    procStore.stop(abort);
124    if (stopAction != null) {
125      stopAction.call();
126    }
127    procExecutor.join();
128    procExecutor.getScheduler().clear();
129
130    // nothing running...
131
132    // re-start
133    LOG.info("RESTART - Start");
134    procStore.start(storeThreads);
135    procExecutor.init(execThreads, failOnCorrupted);
136    if (actionBeforeStartWorker != null) {
137      actionBeforeStartWorker.call();
138    }
139    if (avoidTestKillDuringRestart) {
140      procExecutor.testing = testing;
141    }
142    if (startWorkers) {
143      procExecutor.startWorkers();
144    }
145    if (startAction != null) {
146      startAction.call();
147    }
148  }
149
150  public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
151    throws Exception {
152    storeRestart(procStore, false, loader);
153  }
154
155  public static void storeRestart(ProcedureStore procStore, boolean abort,
156    ProcedureStore.ProcedureLoader loader) throws Exception {
157    procStore.stop(abort);
158    procStore.start(procStore.getNumThreads());
159    procStore.recoverLease();
160    procStore.load(loader);
161  }
162
163  public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
164    long runnableCount, int completedCount, int corruptedCount) throws Exception {
165    final LoadCounter loader = new LoadCounter();
166    storeRestart(procStore, loader);
167    assertEquals(maxProcId, loader.getMaxProcId());
168    assertEquals(runnableCount, loader.getRunnableCount());
169    assertEquals(completedCount, loader.getCompletedCount());
170    assertEquals(corruptedCount, loader.getCorruptedCount());
171    return loader;
172  }
173
174  private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) {
175    if (procExecutor.testing == null) {
176      procExecutor.testing = new ProcedureExecutor.Testing();
177    }
178  }
179
180  public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor,
181    boolean value) {
182    createExecutorTesting(procExecutor);
183    procExecutor.testing.killIfHasParent = value;
184  }
185
186  public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
187    boolean value) {
188    createExecutorTesting(procExecutor);
189    procExecutor.testing.killIfSuspended = value;
190  }
191
192  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
193    boolean value) {
194    createExecutorTesting(procExecutor);
195    procExecutor.testing.killBeforeStoreUpdate = value;
196    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
197    assertSingleExecutorForKillTests(procExecutor);
198  }
199
200  public static <TEnv> void setKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor,
201    boolean value) {
202    createExecutorTesting(procExecutor);
203    procExecutor.testing.killBeforeStoreUpdateInRollback = value;
204    LOG.warn("Set Kill before store update in rollback to: "
205      + procExecutor.testing.killBeforeStoreUpdateInRollback);
206    assertSingleExecutorForKillTests(procExecutor);
207  }
208
209  public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
210    boolean value) {
211    createExecutorTesting(procExecutor);
212    procExecutor.testing.toggleKillBeforeStoreUpdate = value;
213    assertSingleExecutorForKillTests(procExecutor);
214  }
215
216  public static <TEnv> void
217    setToggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor, boolean value) {
218    createExecutorTesting(procExecutor);
219    procExecutor.testing.toggleKillBeforeStoreUpdateInRollback = value;
220    assertSingleExecutorForKillTests(procExecutor);
221  }
222
223  public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
224    createExecutorTesting(procExecutor);
225    procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
226    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
227    assertSingleExecutorForKillTests(procExecutor);
228  }
229
230  public static <TEnv> void
231    toggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor) {
232    createExecutorTesting(procExecutor);
233    procExecutor.testing.killBeforeStoreUpdateInRollback =
234      !procExecutor.testing.killBeforeStoreUpdateInRollback;
235    LOG.warn("Set Kill before store update to in rollback: "
236      + procExecutor.testing.killBeforeStoreUpdateInRollback);
237    assertSingleExecutorForKillTests(procExecutor);
238  }
239
240  public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
241    createExecutorTesting(procExecutor);
242    procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate;
243    LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate);
244    assertSingleExecutorForKillTests(procExecutor);
245  }
246
247  public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
248    boolean value) {
249    setKillBeforeStoreUpdate(procExecutor, value);
250    setToggleKillBeforeStoreUpdate(procExecutor, value);
251    assertSingleExecutorForKillTests(procExecutor);
252  }
253
254  public static <TEnv> void setKillAndToggleBeforeStoreUpdateInRollback(
255    ProcedureExecutor<TEnv> procExecutor, boolean value) {
256    setKillBeforeStoreUpdateInRollback(procExecutor, value);
257    setToggleKillBeforeStoreUpdateInRollback(procExecutor, value);
258    assertSingleExecutorForKillTests(procExecutor);
259  }
260
261  private static <TEnv> void
262    assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) {
263    if (procExecutor.testing == null) {
264      return;
265    }
266
267    if (
268      procExecutor.testing.killBeforeStoreUpdate || procExecutor.testing.toggleKillBeforeStoreUpdate
269    ) {
270      assertEquals(1, procExecutor.getCorePoolSize(),
271        "expected only one executor running during test with kill/restart");
272    }
273  }
274
275  public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
276    throws IOException {
277    NoopProcedureStore procStore = new NoopProcedureStore();
278    ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
279    procStore.start(1);
280    initAndStartWorkers(procExecutor, 1, false, true);
281    try {
282      return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
283    } finally {
284      procStore.stop(false);
285      procExecutor.stop();
286    }
287  }
288
289  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
290    return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
291  }
292
293  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
294    final long nonceGroup, final long nonce) {
295    long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
296    waitProcedure(procExecutor, procId);
297    return procId;
298  }
299
300  public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
301    final long nonceGroup, final long nonce) {
302    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
303    long procId = procExecutor.registerNonce(nonceKey);
304    assertFalse(procId >= 0);
305    return procExecutor.submitProcedure(proc, nonceKey);
306  }
307
308  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
309    while (proc.getState() == ProcedureState.INITIALIZING) {
310      Threads.sleepWithoutInterrupt(250);
311    }
312    waitProcedure(procExecutor, proc.getProcId());
313  }
314
315  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
316    while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
317      Threads.sleepWithoutInterrupt(250);
318    }
319  }
320
321  public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) {
322    for (int i = 0; i < procIds.length; ++i) {
323      waitProcedure(procExecutor, procIds[i]);
324    }
325  }
326
327  public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) {
328    for (long procId : procExecutor.getActiveProcIds()) {
329      waitProcedure(procExecutor, procId);
330    }
331  }
332
333  public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
334    int stableRuns = 0;
335    while (stableRuns < 10) {
336      if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) {
337        stableRuns = 0;
338        Threads.sleepWithoutInterrupt(100);
339      } else {
340        stableRuns++;
341        Threads.sleepWithoutInterrupt(25);
342      }
343    }
344  }
345
346  public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
347    long procId) {
348    assertFalse(procExecutor.isFinished(procId), "expected a running proc");
349    assertNull(procExecutor.getResult(procId));
350  }
351
352  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) {
353    Procedure<?> result = procExecutor.getResult(procId);
354    assertNotNull(result, "expected procedure result");
355    assertProcNotFailed(result);
356  }
357
358  public static void assertProcNotFailed(final Procedure<?> result) {
359    assertFalse(result.isFailed(), "found exception: " + result.getException());
360  }
361
362  public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor,
363    final long procId) {
364    Procedure<?> result = procExecutor.getResult(procId);
365    assertNotNull(result, "expected procedure result");
366    return assertProcFailed(result);
367  }
368
369  public static Throwable assertProcFailed(final Procedure<?> result) {
370    assertTrue(result.isFailed());
371    LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage());
372    return getExceptionCause(result);
373  }
374
375  public static void assertIsAbortException(final Procedure<?> result) {
376    Throwable cause = assertProcFailed(result);
377    assertInstanceOf(ProcedureAbortedException.class, cause,
378      "expected abort exception, got " + cause);
379  }
380
381  public static void assertIsTimeoutException(final Procedure<?> result) {
382    Throwable cause = assertProcFailed(result);
383    assertInstanceOf(TimeoutIOException.class, cause, "expected TimeoutIOException, got " + cause);
384  }
385
386  public static void assertIsIllegalArgumentException(final Procedure<?> result) {
387    Throwable cause = assertProcFailed(result);
388    assertInstanceOf(IllegalArgumentIOException.class, cause,
389      "expected IllegalArgumentIOException, got " + cause);
390  }
391
392  public static Throwable getExceptionCause(final Procedure<?> procInfo) {
393    assert procInfo.isFailed();
394    Throwable cause = procInfo.getException().getCause();
395    return cause == null ? procInfo.getException() : cause;
396  }
397
398  /**
399   * Run through all procedure flow states TWICE while also restarting procedure executor at each
400   * step; i.e force a reread of procedure store.
401   * <p>
402   * It does
403   * <ol>
404   * <li>Execute step N - kill the executor before store update
405   * <li>Restart executor/store
406   * <li>Execute step N - and then save to store
407   * </ol>
408   * <p>
409   * This is a good test for finding state that needs persisting and steps that are not idempotent.
410   */
411  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
412    final long procId) throws Exception {
413    testRecoveryAndDoubleExecution(procExec, procId, false);
414  }
415
416  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
417    final long procId, final boolean expectFailure) throws Exception {
418    testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null);
419  }
420
421  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
422    final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception {
423    Procedure proc = procExec.getProcedure(procId);
424    waitProcedure(procExec, procId);
425    assertEquals(false, procExec.isRunning());
426    for (int i = 0; !procExec.isFinished(procId); ++i) {
427      proc = procExec.getProcedure(procId);
428      LOG.info("Restart " + i + " exec state: " + proc);
429      if (customRestart != null) {
430        customRestart.run();
431      } else {
432        restart(procExec);
433      }
434      waitProcedure(procExec, procId);
435    }
436
437    assertEquals(true, procExec.isRunning());
438    if (expectFailure) {
439      assertProcFailed(procExec, procId);
440    } else {
441      assertProcNotFailed(procExec, procId);
442    }
443  }
444
445  public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
446    public NoopProcedure() {
447    }
448
449    @Override
450    protected Procedure<TEnv>[] execute(TEnv env)
451      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
452      return null;
453    }
454
455    @Override
456    protected void rollback(TEnv env) throws IOException, InterruptedException {
457    }
458
459    @Override
460    protected boolean abort(TEnv env) {
461      return false;
462    }
463
464    @Override
465    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
466    }
467
468    @Override
469    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
470    }
471  }
472
473  public static class NoopStateMachineProcedure<TEnv, TState>
474    extends StateMachineProcedure<TEnv, TState> {
475    private TState initialState;
476    private TEnv env;
477
478    public NoopStateMachineProcedure() {
479    }
480
481    public NoopStateMachineProcedure(TEnv env, TState initialState) {
482      this.env = env;
483      this.initialState = initialState;
484    }
485
486    @Override
487    protected Flow executeFromState(TEnv env, TState tState)
488      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
489      return null;
490    }
491
492    @Override
493    protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException {
494
495    }
496
497    @Override
498    protected TState getState(int stateId) {
499      return null;
500    }
501
502    @Override
503    protected int getStateId(TState tState) {
504      return 0;
505    }
506
507    @Override
508    protected TState getInitialState() {
509      return initialState;
510    }
511  }
512
513  public static class TestProcedure extends NoopProcedure<Void> {
514    private byte[] data = null;
515
516    public TestProcedure() {
517    }
518
519    public TestProcedure(long procId) {
520      this(procId, 0);
521    }
522
523    public TestProcedure(long procId, long parentId) {
524      this(procId, parentId, null);
525    }
526
527    public TestProcedure(long procId, long parentId, byte[] data) {
528      this(procId, parentId, parentId, data);
529    }
530
531    public TestProcedure(long procId, long parentId, long rootId, byte[] data) {
532      setData(data);
533      setProcId(procId);
534      if (parentId > 0) {
535        setParentProcId(parentId);
536      }
537      if (rootId > 0 || parentId > 0) {
538        setRootProcId(rootId);
539      }
540    }
541
542    public void addStackId(final int index) {
543      addStackIndex(index);
544    }
545
546    public void setSuccessState() {
547      setState(ProcedureState.SUCCESS);
548    }
549
550    public void setData(final byte[] data) {
551      this.data = data;
552    }
553
554    @Override
555    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
556      ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
557      BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
558      serializer.serialize(builder.build());
559    }
560
561    @Override
562    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
563      BytesValue bytesValue = serializer.deserialize(BytesValue.class);
564      ByteString dataString = bytesValue.getValue();
565
566      if (dataString.isEmpty()) {
567        data = null;
568      } else {
569        data = dataString.toByteArray();
570      }
571    }
572
573    // Mark acquire/release lock functions public for test uses.
574    @Override
575    public LockState acquireLock(Void env) {
576      return LockState.LOCK_ACQUIRED;
577    }
578
579    @Override
580    public void releaseLock(Void env) {
581      // no-op
582    }
583  }
584
585  public static class LoadCounter implements ProcedureStore.ProcedureLoader {
586    private final ArrayList<Procedure> corrupted = new ArrayList<>();
587    private final ArrayList<Procedure> completed = new ArrayList<>();
588    private final ArrayList<Procedure> runnable = new ArrayList<>();
589
590    private Set<Long> procIds;
591    private long maxProcId = 0;
592
593    public LoadCounter() {
594      this(null);
595    }
596
597    public LoadCounter(final Set<Long> procIds) {
598      this.procIds = procIds;
599    }
600
601    public void reset() {
602      reset(null);
603    }
604
605    public void reset(final Set<Long> procIds) {
606      corrupted.clear();
607      completed.clear();
608      runnable.clear();
609      this.procIds = procIds;
610      this.maxProcId = 0;
611    }
612
613    public long getMaxProcId() {
614      return maxProcId;
615    }
616
617    public ArrayList<Procedure> getRunnables() {
618      return runnable;
619    }
620
621    public int getRunnableCount() {
622      return runnable.size();
623    }
624
625    public ArrayList<Procedure> getCompleted() {
626      return completed;
627    }
628
629    public int getCompletedCount() {
630      return completed.size();
631    }
632
633    public int getLoadedCount() {
634      return runnable.size() + completed.size();
635    }
636
637    public ArrayList<Procedure> getCorrupted() {
638      return corrupted;
639    }
640
641    public int getCorruptedCount() {
642      return corrupted.size();
643    }
644
645    public boolean isRunnable(final long procId) {
646      for (Procedure proc : runnable) {
647        if (proc.getProcId() == procId) {
648          return true;
649        }
650      }
651      return false;
652    }
653
654    @Override
655    public void setMaxProcId(long maxProcId) {
656      this.maxProcId = maxProcId;
657    }
658
659    @Override
660    public void load(ProcedureIterator procIter) throws IOException {
661      while (procIter.hasNext()) {
662        long procId;
663        if (procIter.isNextFinished()) {
664          Procedure<?> proc = procIter.next();
665          procId = proc.getProcId();
666          LOG.debug("loading completed procId=" + procId + ": " + proc);
667          completed.add(proc);
668        } else {
669          Procedure proc = procIter.next();
670          procId = proc.getProcId();
671          LOG.debug("loading runnable procId=" + procId + ": " + proc);
672          runnable.add(proc);
673        }
674        if (procIds != null) {
675          assertTrue(procIds.contains(procId), "procId=" + procId + " unexpected");
676        }
677      }
678    }
679
680    @Override
681    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
682      while (procIter.hasNext()) {
683        Procedure proc = procIter.next();
684        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
685        corrupted.add(proc);
686      }
687    }
688  }
689}