001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
033import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
034import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
035import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
037import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
038import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
039import org.apache.hadoop.hbase.util.NonceKey;
040import org.apache.hadoop.hbase.util.Threads;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
045import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
046
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
048
049public final class ProcedureTestingUtility {
050  private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class);
051
052  private ProcedureTestingUtility() {
053  }
054
055  public static ProcedureStore createStore(final Configuration conf, final Path dir)
056    throws IOException {
057    return createWalStore(conf, dir);
058  }
059
060  public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
061    throws IOException {
062    return new WALProcedureStore(conf, dir, null, new LeaseRecovery() {
063      @Override
064      public void recoverFileLease(FileSystem fs, Path path) throws IOException {
065        // no-op
066      }
067    });
068  }
069
070  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort,
071    boolean startWorkers) throws Exception {
072    restart(procExecutor, false, true, null, null, null, abort, startWorkers);
073  }
074
075  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort)
076    throws Exception {
077    restart(procExecutor, false, true, null, null, null, abort, true);
078  }
079
080  public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
081    restart(procExecutor, false, true, null, null, null, false, true);
082  }
083
084  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
085    boolean abortOnCorruption) throws IOException {
086    initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
087  }
088
089  public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
090    boolean abortOnCorruption, boolean startWorkers) throws IOException {
091    procExecutor.init(numThreads, abortOnCorruption);
092    if (startWorkers) {
093      procExecutor.startWorkers();
094    }
095  }
096
097  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
098    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
099    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
100    restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction,
101      actionBeforeStartWorker, startAction, false, true);
102  }
103
104  public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
105    boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
106    Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
107    boolean startWorkers) throws Exception {
108    final ProcedureStore procStore = procExecutor.getStore();
109    final int storeThreads = procExecutor.getCorePoolSize();
110    final int execThreads = procExecutor.getCorePoolSize();
111
112    final ProcedureExecutor.Testing testing = procExecutor.testing;
113    if (avoidTestKillDuringRestart) {
114      procExecutor.testing = null;
115    }
116
117    // stop
118    LOG.info("RESTART - Stop");
119    procExecutor.stop();
120    procStore.stop(abort);
121    if (stopAction != null) {
122      stopAction.call();
123    }
124    procExecutor.join();
125    procExecutor.getScheduler().clear();
126
127    // nothing running...
128
129    // re-start
130    LOG.info("RESTART - Start");
131    procStore.start(storeThreads);
132    procExecutor.init(execThreads, failOnCorrupted);
133    if (actionBeforeStartWorker != null) {
134      actionBeforeStartWorker.call();
135    }
136    if (avoidTestKillDuringRestart) {
137      procExecutor.testing = testing;
138    }
139    if (startWorkers) {
140      procExecutor.startWorkers();
141    }
142    if (startAction != null) {
143      startAction.call();
144    }
145  }
146
147  public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
148    throws Exception {
149    storeRestart(procStore, false, loader);
150  }
151
152  public static void storeRestart(ProcedureStore procStore, boolean abort,
153    ProcedureStore.ProcedureLoader loader) throws Exception {
154    procStore.stop(abort);
155    procStore.start(procStore.getNumThreads());
156    procStore.recoverLease();
157    procStore.load(loader);
158  }
159
160  public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
161    long runnableCount, int completedCount, int corruptedCount) throws Exception {
162    final LoadCounter loader = new LoadCounter();
163    storeRestart(procStore, loader);
164    assertEquals(maxProcId, loader.getMaxProcId());
165    assertEquals(runnableCount, loader.getRunnableCount());
166    assertEquals(completedCount, loader.getCompletedCount());
167    assertEquals(corruptedCount, loader.getCorruptedCount());
168    return loader;
169  }
170
171  private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) {
172    if (procExecutor.testing == null) {
173      procExecutor.testing = new ProcedureExecutor.Testing();
174    }
175  }
176
177  public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor,
178    boolean value) {
179    createExecutorTesting(procExecutor);
180    procExecutor.testing.killIfHasParent = value;
181  }
182
183  public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
184    boolean value) {
185    createExecutorTesting(procExecutor);
186    procExecutor.testing.killIfSuspended = value;
187  }
188
189  public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
190    boolean value) {
191    createExecutorTesting(procExecutor);
192    procExecutor.testing.killBeforeStoreUpdate = value;
193    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
194    assertSingleExecutorForKillTests(procExecutor);
195  }
196
197  public static <TEnv> void setKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor,
198    boolean value) {
199    createExecutorTesting(procExecutor);
200    procExecutor.testing.killBeforeStoreUpdateInRollback = value;
201    LOG.warn("Set Kill before store update in rollback to: "
202      + procExecutor.testing.killBeforeStoreUpdateInRollback);
203    assertSingleExecutorForKillTests(procExecutor);
204  }
205
206  public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
207    boolean value) {
208    createExecutorTesting(procExecutor);
209    procExecutor.testing.toggleKillBeforeStoreUpdate = value;
210    assertSingleExecutorForKillTests(procExecutor);
211  }
212
213  public static <TEnv> void
214    setToggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor, boolean value) {
215    createExecutorTesting(procExecutor);
216    procExecutor.testing.toggleKillBeforeStoreUpdateInRollback = value;
217    assertSingleExecutorForKillTests(procExecutor);
218  }
219
220  public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
221    createExecutorTesting(procExecutor);
222    procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
223    LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
224    assertSingleExecutorForKillTests(procExecutor);
225  }
226
227  public static <TEnv> void
228    toggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor) {
229    createExecutorTesting(procExecutor);
230    procExecutor.testing.killBeforeStoreUpdateInRollback =
231      !procExecutor.testing.killBeforeStoreUpdateInRollback;
232    LOG.warn("Set Kill before store update to in rollback: "
233      + procExecutor.testing.killBeforeStoreUpdateInRollback);
234    assertSingleExecutorForKillTests(procExecutor);
235  }
236
237  public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
238    createExecutorTesting(procExecutor);
239    procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate;
240    LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate);
241    assertSingleExecutorForKillTests(procExecutor);
242  }
243
244  public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
245    boolean value) {
246    setKillBeforeStoreUpdate(procExecutor, value);
247    setToggleKillBeforeStoreUpdate(procExecutor, value);
248    assertSingleExecutorForKillTests(procExecutor);
249  }
250
251  public static <TEnv> void setKillAndToggleBeforeStoreUpdateInRollback(
252    ProcedureExecutor<TEnv> procExecutor, boolean value) {
253    setKillBeforeStoreUpdateInRollback(procExecutor, value);
254    setToggleKillBeforeStoreUpdateInRollback(procExecutor, value);
255    assertSingleExecutorForKillTests(procExecutor);
256  }
257
258  private static <TEnv> void
259    assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) {
260    if (procExecutor.testing == null) {
261      return;
262    }
263
264    if (
265      procExecutor.testing.killBeforeStoreUpdate || procExecutor.testing.toggleKillBeforeStoreUpdate
266    ) {
267      assertEquals("expected only one executor running during test with kill/restart", 1,
268        procExecutor.getCorePoolSize());
269    }
270  }
271
272  public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
273    throws IOException {
274    NoopProcedureStore procStore = new NoopProcedureStore();
275    ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
276    procStore.start(1);
277    initAndStartWorkers(procExecutor, 1, false, true);
278    try {
279      return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
280    } finally {
281      procStore.stop(false);
282      procExecutor.stop();
283    }
284  }
285
286  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
287    return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
288  }
289
290  public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
291    final long nonceGroup, final long nonce) {
292    long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
293    waitProcedure(procExecutor, procId);
294    return procId;
295  }
296
297  public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
298    final long nonceGroup, final long nonce) {
299    final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
300    long procId = procExecutor.registerNonce(nonceKey);
301    assertFalse(procId >= 0);
302    return procExecutor.submitProcedure(proc, nonceKey);
303  }
304
305  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
306    while (proc.getState() == ProcedureState.INITIALIZING) {
307      Threads.sleepWithoutInterrupt(250);
308    }
309    waitProcedure(procExecutor, proc.getProcId());
310  }
311
312  public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
313    while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
314      Threads.sleepWithoutInterrupt(250);
315    }
316  }
317
318  public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) {
319    for (int i = 0; i < procIds.length; ++i) {
320      waitProcedure(procExecutor, procIds[i]);
321    }
322  }
323
324  public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) {
325    for (long procId : procExecutor.getActiveProcIds()) {
326      waitProcedure(procExecutor, procId);
327    }
328  }
329
330  public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
331    int stableRuns = 0;
332    while (stableRuns < 10) {
333      if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) {
334        stableRuns = 0;
335        Threads.sleepWithoutInterrupt(100);
336      } else {
337        stableRuns++;
338        Threads.sleepWithoutInterrupt(25);
339      }
340    }
341  }
342
343  public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
344    long procId) {
345    assertFalse("expected a running proc", procExecutor.isFinished(procId));
346    assertEquals(null, procExecutor.getResult(procId));
347  }
348
349  public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) {
350    Procedure<?> result = procExecutor.getResult(procId);
351    assertTrue("expected procedure result", result != null);
352    assertProcNotFailed(result);
353  }
354
355  public static void assertProcNotFailed(final Procedure<?> result) {
356    assertFalse("found exception: " + result.getException(), result.isFailed());
357  }
358
359  public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor,
360    final long procId) {
361    Procedure<?> result = procExecutor.getResult(procId);
362    assertTrue("expected procedure result", result != null);
363    return assertProcFailed(result);
364  }
365
366  public static Throwable assertProcFailed(final Procedure<?> result) {
367    assertEquals(true, result.isFailed());
368    LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage());
369    return getExceptionCause(result);
370  }
371
372  public static void assertIsAbortException(final Procedure<?> result) {
373    Throwable cause = assertProcFailed(result);
374    assertTrue("expected abort exception, got " + cause,
375      cause instanceof ProcedureAbortedException);
376  }
377
378  public static void assertIsTimeoutException(final Procedure<?> result) {
379    Throwable cause = assertProcFailed(result);
380    assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
381  }
382
383  public static void assertIsIllegalArgumentException(final Procedure<?> result) {
384    Throwable cause = assertProcFailed(result);
385    assertTrue("expected IllegalArgumentIOException, got " + cause,
386      cause instanceof IllegalArgumentIOException);
387  }
388
389  public static Throwable getExceptionCause(final Procedure<?> procInfo) {
390    assert procInfo.isFailed();
391    Throwable cause = procInfo.getException().getCause();
392    return cause == null ? procInfo.getException() : cause;
393  }
394
395  /**
396   * Run through all procedure flow states TWICE while also restarting procedure executor at each
397   * step; i.e force a reread of procedure store.
398   * <p>
399   * It does
400   * <ol>
401   * <li>Execute step N - kill the executor before store update
402   * <li>Restart executor/store
403   * <li>Execute step N - and then save to store
404   * </ol>
405   * <p>
406   * This is a good test for finding state that needs persisting and steps that are not idempotent.
407   */
408  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
409    final long procId) throws Exception {
410    testRecoveryAndDoubleExecution(procExec, procId, false);
411  }
412
413  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
414    final long procId, final boolean expectFailure) throws Exception {
415    testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null);
416  }
417
418  public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
419    final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception {
420    Procedure proc = procExec.getProcedure(procId);
421    waitProcedure(procExec, procId);
422    assertEquals(false, procExec.isRunning());
423    for (int i = 0; !procExec.isFinished(procId); ++i) {
424      proc = procExec.getProcedure(procId);
425      LOG.info("Restart " + i + " exec state: " + proc);
426      if (customRestart != null) {
427        customRestart.run();
428      } else {
429        restart(procExec);
430      }
431      waitProcedure(procExec, procId);
432    }
433
434    assertEquals(true, procExec.isRunning());
435    if (expectFailure) {
436      assertProcFailed(procExec, procId);
437    } else {
438      assertProcNotFailed(procExec, procId);
439    }
440  }
441
442  public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
443    public NoopProcedure() {
444    }
445
446    @Override
447    protected Procedure<TEnv>[] execute(TEnv env)
448      throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
449      return null;
450    }
451
452    @Override
453    protected void rollback(TEnv env) throws IOException, InterruptedException {
454    }
455
456    @Override
457    protected boolean abort(TEnv env) {
458      return false;
459    }
460
461    @Override
462    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
463    }
464
465    @Override
466    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
467    }
468  }
469
470  public static class NoopStateMachineProcedure<TEnv, TState>
471    extends StateMachineProcedure<TEnv, TState> {
472    private TState initialState;
473    private TEnv env;
474
475    public NoopStateMachineProcedure() {
476    }
477
478    public NoopStateMachineProcedure(TEnv env, TState initialState) {
479      this.env = env;
480      this.initialState = initialState;
481    }
482
483    @Override
484    protected Flow executeFromState(TEnv env, TState tState)
485      throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
486      return null;
487    }
488
489    @Override
490    protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException {
491
492    }
493
494    @Override
495    protected TState getState(int stateId) {
496      return null;
497    }
498
499    @Override
500    protected int getStateId(TState tState) {
501      return 0;
502    }
503
504    @Override
505    protected TState getInitialState() {
506      return initialState;
507    }
508  }
509
510  public static class TestProcedure extends NoopProcedure<Void> {
511    private byte[] data = null;
512
513    public TestProcedure() {
514    }
515
516    public TestProcedure(long procId) {
517      this(procId, 0);
518    }
519
520    public TestProcedure(long procId, long parentId) {
521      this(procId, parentId, null);
522    }
523
524    public TestProcedure(long procId, long parentId, byte[] data) {
525      this(procId, parentId, parentId, data);
526    }
527
528    public TestProcedure(long procId, long parentId, long rootId, byte[] data) {
529      setData(data);
530      setProcId(procId);
531      if (parentId > 0) {
532        setParentProcId(parentId);
533      }
534      if (rootId > 0 || parentId > 0) {
535        setRootProcId(rootId);
536      }
537    }
538
539    public void addStackId(final int index) {
540      addStackIndex(index);
541    }
542
543    public void setSuccessState() {
544      setState(ProcedureState.SUCCESS);
545    }
546
547    public void setData(final byte[] data) {
548      this.data = data;
549    }
550
551    @Override
552    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
553      ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
554      BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
555      serializer.serialize(builder.build());
556    }
557
558    @Override
559    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
560      BytesValue bytesValue = serializer.deserialize(BytesValue.class);
561      ByteString dataString = bytesValue.getValue();
562
563      if (dataString.isEmpty()) {
564        data = null;
565      } else {
566        data = dataString.toByteArray();
567      }
568    }
569
570    // Mark acquire/release lock functions public for test uses.
571    @Override
572    public LockState acquireLock(Void env) {
573      return LockState.LOCK_ACQUIRED;
574    }
575
576    @Override
577    public void releaseLock(Void env) {
578      // no-op
579    }
580  }
581
582  public static class LoadCounter implements ProcedureStore.ProcedureLoader {
583    private final ArrayList<Procedure> corrupted = new ArrayList<>();
584    private final ArrayList<Procedure> completed = new ArrayList<>();
585    private final ArrayList<Procedure> runnable = new ArrayList<>();
586
587    private Set<Long> procIds;
588    private long maxProcId = 0;
589
590    public LoadCounter() {
591      this(null);
592    }
593
594    public LoadCounter(final Set<Long> procIds) {
595      this.procIds = procIds;
596    }
597
598    public void reset() {
599      reset(null);
600    }
601
602    public void reset(final Set<Long> procIds) {
603      corrupted.clear();
604      completed.clear();
605      runnable.clear();
606      this.procIds = procIds;
607      this.maxProcId = 0;
608    }
609
610    public long getMaxProcId() {
611      return maxProcId;
612    }
613
614    public ArrayList<Procedure> getRunnables() {
615      return runnable;
616    }
617
618    public int getRunnableCount() {
619      return runnable.size();
620    }
621
622    public ArrayList<Procedure> getCompleted() {
623      return completed;
624    }
625
626    public int getCompletedCount() {
627      return completed.size();
628    }
629
630    public int getLoadedCount() {
631      return runnable.size() + completed.size();
632    }
633
634    public ArrayList<Procedure> getCorrupted() {
635      return corrupted;
636    }
637
638    public int getCorruptedCount() {
639      return corrupted.size();
640    }
641
642    public boolean isRunnable(final long procId) {
643      for (Procedure proc : runnable) {
644        if (proc.getProcId() == procId) {
645          return true;
646        }
647      }
648      return false;
649    }
650
651    @Override
652    public void setMaxProcId(long maxProcId) {
653      this.maxProcId = maxProcId;
654    }
655
656    @Override
657    public void load(ProcedureIterator procIter) throws IOException {
658      while (procIter.hasNext()) {
659        long procId;
660        if (procIter.isNextFinished()) {
661          Procedure<?> proc = procIter.next();
662          procId = proc.getProcId();
663          LOG.debug("loading completed procId=" + procId + ": " + proc);
664          completed.add(proc);
665        } else {
666          Procedure proc = procIter.next();
667          procId = proc.getProcId();
668          LOG.debug("loading runnable procId=" + procId + ": " + proc);
669          runnable.add(proc);
670        }
671        if (procIds != null) {
672          assertTrue("procId=" + procId + " unexpected", procIds.contains(procId));
673        }
674      }
675    }
676
677    @Override
678    public void handleCorrupted(ProcedureIterator procIter) throws IOException {
679      while (procIter.hasNext()) {
680        Procedure proc = procIter.next();
681        LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc);
682        corrupted.add(proc);
683      }
684    }
685  }
686}