001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 033import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 034import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 035import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 037import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 038import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 039import org.apache.hadoop.hbase.util.NonceKey; 040import org.apache.hadoop.hbase.util.Threads; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 045import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 048 049public final class ProcedureTestingUtility { 050 private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class); 051 052 private ProcedureTestingUtility() { 053 } 054 055 public static ProcedureStore createStore(final Configuration conf, final Path dir) 056 throws IOException { 057 return createWalStore(conf, dir); 058 } 059 060 public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) 061 throws IOException { 062 return new WALProcedureStore(conf, dir, null, new LeaseRecovery() { 063 @Override 064 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 065 // no-op 066 } 067 }); 068 } 069 070 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort, 071 boolean startWorkers) throws Exception { 072 restart(procExecutor, false, true, null, null, null, abort, startWorkers); 073 } 074 075 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort) 076 throws Exception { 077 restart(procExecutor, false, true, null, null, null, abort, true); 078 } 079 080 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception { 081 restart(procExecutor, false, true, null, null, null, false, true); 082 } 083 084 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 085 boolean abortOnCorruption) throws IOException { 086 initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true); 087 } 088 089 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 090 boolean abortOnCorruption, boolean startWorkers) throws IOException { 091 procExecutor.init(numThreads, abortOnCorruption); 092 if (startWorkers) { 093 procExecutor.startWorkers(); 094 } 095 } 096 097 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 098 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 099 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception { 100 restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction, 101 actionBeforeStartWorker, startAction, false, true); 102 } 103 104 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 105 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 106 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort, 107 boolean startWorkers) throws Exception { 108 final ProcedureStore procStore = procExecutor.getStore(); 109 final int storeThreads = procExecutor.getCorePoolSize(); 110 final int execThreads = procExecutor.getCorePoolSize(); 111 112 final ProcedureExecutor.Testing testing = procExecutor.testing; 113 if (avoidTestKillDuringRestart) { 114 procExecutor.testing = null; 115 } 116 117 // stop 118 LOG.info("RESTART - Stop"); 119 procExecutor.stop(); 120 procStore.stop(abort); 121 if (stopAction != null) { 122 stopAction.call(); 123 } 124 procExecutor.join(); 125 procExecutor.getScheduler().clear(); 126 127 // nothing running... 128 129 // re-start 130 LOG.info("RESTART - Start"); 131 procStore.start(storeThreads); 132 procExecutor.init(execThreads, failOnCorrupted); 133 if (actionBeforeStartWorker != null) { 134 actionBeforeStartWorker.call(); 135 } 136 if (avoidTestKillDuringRestart) { 137 procExecutor.testing = testing; 138 } 139 if (startWorkers) { 140 procExecutor.startWorkers(); 141 } 142 if (startAction != null) { 143 startAction.call(); 144 } 145 } 146 147 public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader) 148 throws Exception { 149 storeRestart(procStore, false, loader); 150 } 151 152 public static void storeRestart(ProcedureStore procStore, boolean abort, 153 ProcedureStore.ProcedureLoader loader) throws Exception { 154 procStore.stop(abort); 155 procStore.start(procStore.getNumThreads()); 156 procStore.recoverLease(); 157 procStore.load(loader); 158 } 159 160 public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId, 161 long runnableCount, int completedCount, int corruptedCount) throws Exception { 162 final LoadCounter loader = new LoadCounter(); 163 storeRestart(procStore, loader); 164 assertEquals(maxProcId, loader.getMaxProcId()); 165 assertEquals(runnableCount, loader.getRunnableCount()); 166 assertEquals(completedCount, loader.getCompletedCount()); 167 assertEquals(corruptedCount, loader.getCorruptedCount()); 168 return loader; 169 } 170 171 private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) { 172 if (procExecutor.testing == null) { 173 procExecutor.testing = new ProcedureExecutor.Testing(); 174 } 175 } 176 177 public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor, 178 boolean value) { 179 createExecutorTesting(procExecutor); 180 procExecutor.testing.killIfHasParent = value; 181 } 182 183 public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor, 184 boolean value) { 185 createExecutorTesting(procExecutor); 186 procExecutor.testing.killIfSuspended = value; 187 } 188 189 public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 190 boolean value) { 191 createExecutorTesting(procExecutor); 192 procExecutor.testing.killBeforeStoreUpdate = value; 193 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 194 assertSingleExecutorForKillTests(procExecutor); 195 } 196 197 public static <TEnv> void setKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor, 198 boolean value) { 199 createExecutorTesting(procExecutor); 200 procExecutor.testing.killBeforeStoreUpdateInRollback = value; 201 LOG.warn("Set Kill before store update in rollback to: " 202 + procExecutor.testing.killBeforeStoreUpdateInRollback); 203 assertSingleExecutorForKillTests(procExecutor); 204 } 205 206 public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 207 boolean value) { 208 createExecutorTesting(procExecutor); 209 procExecutor.testing.toggleKillBeforeStoreUpdate = value; 210 assertSingleExecutorForKillTests(procExecutor); 211 } 212 213 public static <TEnv> void 214 setToggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor, boolean value) { 215 createExecutorTesting(procExecutor); 216 procExecutor.testing.toggleKillBeforeStoreUpdateInRollback = value; 217 assertSingleExecutorForKillTests(procExecutor); 218 } 219 220 public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 221 createExecutorTesting(procExecutor); 222 procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; 223 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 224 assertSingleExecutorForKillTests(procExecutor); 225 } 226 227 public static <TEnv> void 228 toggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor) { 229 createExecutorTesting(procExecutor); 230 procExecutor.testing.killBeforeStoreUpdateInRollback = 231 !procExecutor.testing.killBeforeStoreUpdateInRollback; 232 LOG.warn("Set Kill before store update to in rollback: " 233 + procExecutor.testing.killBeforeStoreUpdateInRollback); 234 assertSingleExecutorForKillTests(procExecutor); 235 } 236 237 public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 238 createExecutorTesting(procExecutor); 239 procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate; 240 LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate); 241 assertSingleExecutorForKillTests(procExecutor); 242 } 243 244 public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 245 boolean value) { 246 setKillBeforeStoreUpdate(procExecutor, value); 247 setToggleKillBeforeStoreUpdate(procExecutor, value); 248 assertSingleExecutorForKillTests(procExecutor); 249 } 250 251 public static <TEnv> void setKillAndToggleBeforeStoreUpdateInRollback( 252 ProcedureExecutor<TEnv> procExecutor, boolean value) { 253 setKillBeforeStoreUpdateInRollback(procExecutor, value); 254 setToggleKillBeforeStoreUpdateInRollback(procExecutor, value); 255 assertSingleExecutorForKillTests(procExecutor); 256 } 257 258 private static <TEnv> void 259 assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) { 260 if (procExecutor.testing == null) { 261 return; 262 } 263 264 if ( 265 procExecutor.testing.killBeforeStoreUpdate || procExecutor.testing.toggleKillBeforeStoreUpdate 266 ) { 267 assertEquals("expected only one executor running during test with kill/restart", 1, 268 procExecutor.getCorePoolSize()); 269 } 270 } 271 272 public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc) 273 throws IOException { 274 NoopProcedureStore procStore = new NoopProcedureStore(); 275 ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore); 276 procStore.start(1); 277 initAndStartWorkers(procExecutor, 1, false, true); 278 try { 279 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 280 } finally { 281 procStore.stop(false); 282 procExecutor.stop(); 283 } 284 } 285 286 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 287 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 288 } 289 290 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 291 final long nonceGroup, final long nonce) { 292 long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce); 293 waitProcedure(procExecutor, procId); 294 return procId; 295 } 296 297 public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 298 final long nonceGroup, final long nonce) { 299 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 300 long procId = procExecutor.registerNonce(nonceKey); 301 assertFalse(procId >= 0); 302 return procExecutor.submitProcedure(proc, nonceKey); 303 } 304 305 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 306 while (proc.getState() == ProcedureState.INITIALIZING) { 307 Threads.sleepWithoutInterrupt(250); 308 } 309 waitProcedure(procExecutor, proc.getProcId()); 310 } 311 312 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) { 313 while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) { 314 Threads.sleepWithoutInterrupt(250); 315 } 316 } 317 318 public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) { 319 for (int i = 0; i < procIds.length; ++i) { 320 waitProcedure(procExecutor, procIds[i]); 321 } 322 } 323 324 public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) { 325 for (long procId : procExecutor.getActiveProcIds()) { 326 waitProcedure(procExecutor, procId); 327 } 328 } 329 330 public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) { 331 int stableRuns = 0; 332 while (stableRuns < 10) { 333 if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) { 334 stableRuns = 0; 335 Threads.sleepWithoutInterrupt(100); 336 } else { 337 stableRuns++; 338 Threads.sleepWithoutInterrupt(25); 339 } 340 } 341 } 342 343 public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor, 344 long procId) { 345 assertFalse("expected a running proc", procExecutor.isFinished(procId)); 346 assertEquals(null, procExecutor.getResult(procId)); 347 } 348 349 public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) { 350 Procedure<?> result = procExecutor.getResult(procId); 351 assertTrue("expected procedure result", result != null); 352 assertProcNotFailed(result); 353 } 354 355 public static void assertProcNotFailed(final Procedure<?> result) { 356 assertFalse("found exception: " + result.getException(), result.isFailed()); 357 } 358 359 public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor, 360 final long procId) { 361 Procedure<?> result = procExecutor.getResult(procId); 362 assertTrue("expected procedure result", result != null); 363 return assertProcFailed(result); 364 } 365 366 public static Throwable assertProcFailed(final Procedure<?> result) { 367 assertEquals(true, result.isFailed()); 368 LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage()); 369 return getExceptionCause(result); 370 } 371 372 public static void assertIsAbortException(final Procedure<?> result) { 373 Throwable cause = assertProcFailed(result); 374 assertTrue("expected abort exception, got " + cause, 375 cause instanceof ProcedureAbortedException); 376 } 377 378 public static void assertIsTimeoutException(final Procedure<?> result) { 379 Throwable cause = assertProcFailed(result); 380 assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException); 381 } 382 383 public static void assertIsIllegalArgumentException(final Procedure<?> result) { 384 Throwable cause = assertProcFailed(result); 385 assertTrue("expected IllegalArgumentIOException, got " + cause, 386 cause instanceof IllegalArgumentIOException); 387 } 388 389 public static Throwable getExceptionCause(final Procedure<?> procInfo) { 390 assert procInfo.isFailed(); 391 Throwable cause = procInfo.getException().getCause(); 392 return cause == null ? procInfo.getException() : cause; 393 } 394 395 /** 396 * Run through all procedure flow states TWICE while also restarting procedure executor at each 397 * step; i.e force a reread of procedure store. 398 * <p> 399 * It does 400 * <ol> 401 * <li>Execute step N - kill the executor before store update 402 * <li>Restart executor/store 403 * <li>Execute step N - and then save to store 404 * </ol> 405 * <p> 406 * This is a good test for finding state that needs persisting and steps that are not idempotent. 407 */ 408 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 409 final long procId) throws Exception { 410 testRecoveryAndDoubleExecution(procExec, procId, false); 411 } 412 413 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 414 final long procId, final boolean expectFailure) throws Exception { 415 testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null); 416 } 417 418 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 419 final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception { 420 Procedure proc = procExec.getProcedure(procId); 421 waitProcedure(procExec, procId); 422 assertEquals(false, procExec.isRunning()); 423 for (int i = 0; !procExec.isFinished(procId); ++i) { 424 proc = procExec.getProcedure(procId); 425 LOG.info("Restart " + i + " exec state: " + proc); 426 if (customRestart != null) { 427 customRestart.run(); 428 } else { 429 restart(procExec); 430 } 431 waitProcedure(procExec, procId); 432 } 433 434 assertEquals(true, procExec.isRunning()); 435 if (expectFailure) { 436 assertProcFailed(procExec, procId); 437 } else { 438 assertProcNotFailed(procExec, procId); 439 } 440 } 441 442 public static class NoopProcedure<TEnv> extends Procedure<TEnv> { 443 public NoopProcedure() { 444 } 445 446 @Override 447 protected Procedure<TEnv>[] execute(TEnv env) 448 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 449 return null; 450 } 451 452 @Override 453 protected void rollback(TEnv env) throws IOException, InterruptedException { 454 } 455 456 @Override 457 protected boolean abort(TEnv env) { 458 return false; 459 } 460 461 @Override 462 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 463 } 464 465 @Override 466 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 467 } 468 } 469 470 public static class NoopStateMachineProcedure<TEnv, TState> 471 extends StateMachineProcedure<TEnv, TState> { 472 private TState initialState; 473 private TEnv env; 474 475 public NoopStateMachineProcedure() { 476 } 477 478 public NoopStateMachineProcedure(TEnv env, TState initialState) { 479 this.env = env; 480 this.initialState = initialState; 481 } 482 483 @Override 484 protected Flow executeFromState(TEnv env, TState tState) 485 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 486 return null; 487 } 488 489 @Override 490 protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException { 491 492 } 493 494 @Override 495 protected TState getState(int stateId) { 496 return null; 497 } 498 499 @Override 500 protected int getStateId(TState tState) { 501 return 0; 502 } 503 504 @Override 505 protected TState getInitialState() { 506 return initialState; 507 } 508 } 509 510 public static class TestProcedure extends NoopProcedure<Void> { 511 private byte[] data = null; 512 513 public TestProcedure() { 514 } 515 516 public TestProcedure(long procId) { 517 this(procId, 0); 518 } 519 520 public TestProcedure(long procId, long parentId) { 521 this(procId, parentId, null); 522 } 523 524 public TestProcedure(long procId, long parentId, byte[] data) { 525 this(procId, parentId, parentId, data); 526 } 527 528 public TestProcedure(long procId, long parentId, long rootId, byte[] data) { 529 setData(data); 530 setProcId(procId); 531 if (parentId > 0) { 532 setParentProcId(parentId); 533 } 534 if (rootId > 0 || parentId > 0) { 535 setRootProcId(rootId); 536 } 537 } 538 539 public void addStackId(final int index) { 540 addStackIndex(index); 541 } 542 543 public void setSuccessState() { 544 setState(ProcedureState.SUCCESS); 545 } 546 547 public void setData(final byte[] data) { 548 this.data = data; 549 } 550 551 @Override 552 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 553 ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data); 554 BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString); 555 serializer.serialize(builder.build()); 556 } 557 558 @Override 559 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 560 BytesValue bytesValue = serializer.deserialize(BytesValue.class); 561 ByteString dataString = bytesValue.getValue(); 562 563 if (dataString.isEmpty()) { 564 data = null; 565 } else { 566 data = dataString.toByteArray(); 567 } 568 } 569 570 // Mark acquire/release lock functions public for test uses. 571 @Override 572 public LockState acquireLock(Void env) { 573 return LockState.LOCK_ACQUIRED; 574 } 575 576 @Override 577 public void releaseLock(Void env) { 578 // no-op 579 } 580 } 581 582 public static class LoadCounter implements ProcedureStore.ProcedureLoader { 583 private final ArrayList<Procedure> corrupted = new ArrayList<>(); 584 private final ArrayList<Procedure> completed = new ArrayList<>(); 585 private final ArrayList<Procedure> runnable = new ArrayList<>(); 586 587 private Set<Long> procIds; 588 private long maxProcId = 0; 589 590 public LoadCounter() { 591 this(null); 592 } 593 594 public LoadCounter(final Set<Long> procIds) { 595 this.procIds = procIds; 596 } 597 598 public void reset() { 599 reset(null); 600 } 601 602 public void reset(final Set<Long> procIds) { 603 corrupted.clear(); 604 completed.clear(); 605 runnable.clear(); 606 this.procIds = procIds; 607 this.maxProcId = 0; 608 } 609 610 public long getMaxProcId() { 611 return maxProcId; 612 } 613 614 public ArrayList<Procedure> getRunnables() { 615 return runnable; 616 } 617 618 public int getRunnableCount() { 619 return runnable.size(); 620 } 621 622 public ArrayList<Procedure> getCompleted() { 623 return completed; 624 } 625 626 public int getCompletedCount() { 627 return completed.size(); 628 } 629 630 public int getLoadedCount() { 631 return runnable.size() + completed.size(); 632 } 633 634 public ArrayList<Procedure> getCorrupted() { 635 return corrupted; 636 } 637 638 public int getCorruptedCount() { 639 return corrupted.size(); 640 } 641 642 public boolean isRunnable(final long procId) { 643 for (Procedure proc : runnable) { 644 if (proc.getProcId() == procId) { 645 return true; 646 } 647 } 648 return false; 649 } 650 651 @Override 652 public void setMaxProcId(long maxProcId) { 653 this.maxProcId = maxProcId; 654 } 655 656 @Override 657 public void load(ProcedureIterator procIter) throws IOException { 658 while (procIter.hasNext()) { 659 long procId; 660 if (procIter.isNextFinished()) { 661 Procedure<?> proc = procIter.next(); 662 procId = proc.getProcId(); 663 LOG.debug("loading completed procId=" + procId + ": " + proc); 664 completed.add(proc); 665 } else { 666 Procedure proc = procIter.next(); 667 procId = proc.getProcId(); 668 LOG.debug("loading runnable procId=" + procId + ": " + proc); 669 runnable.add(proc); 670 } 671 if (procIds != null) { 672 assertTrue("procId=" + procId + " unexpected", procIds.contains(procId)); 673 } 674 } 675 } 676 677 @Override 678 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 679 while (procIter.hasNext()) { 680 Procedure proc = procIter.next(); 681 LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc); 682 corrupted.add(proc); 683 } 684 } 685 } 686}