001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertInstanceOf; 023import static org.junit.jupiter.api.Assertions.assertNotNull; 024import static org.junit.jupiter.api.Assertions.assertNull; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Set; 030import java.util.concurrent.Callable; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.hbase.HConstants; 035import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 036import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 037import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 038import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 039import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 040import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 041import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 042import org.apache.hadoop.hbase.util.NonceKey; 043import org.apache.hadoop.hbase.util.Threads; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 048import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue; 049 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 051 052public final class ProcedureTestingUtility { 053 private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class); 054 055 private ProcedureTestingUtility() { 056 } 057 058 public static ProcedureStore createStore(final Configuration conf, final Path dir) 059 throws IOException { 060 return createWalStore(conf, dir); 061 } 062 063 public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) 064 throws IOException { 065 return new WALProcedureStore(conf, dir, null, new LeaseRecovery() { 066 @Override 067 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 068 // no-op 069 } 070 }); 071 } 072 073 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort, 074 boolean startWorkers) throws Exception { 075 restart(procExecutor, false, true, null, null, null, abort, startWorkers); 076 } 077 078 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort) 079 throws Exception { 080 restart(procExecutor, false, true, null, null, null, abort, true); 081 } 082 083 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception { 084 restart(procExecutor, false, true, null, null, null, false, true); 085 } 086 087 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 088 boolean abortOnCorruption) throws IOException { 089 initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true); 090 } 091 092 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 093 boolean abortOnCorruption, boolean startWorkers) throws IOException { 094 procExecutor.init(numThreads, abortOnCorruption); 095 if (startWorkers) { 096 procExecutor.startWorkers(); 097 } 098 } 099 100 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 101 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 102 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception { 103 restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction, 104 actionBeforeStartWorker, startAction, false, true); 105 } 106 107 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 108 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 109 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort, 110 boolean startWorkers) throws Exception { 111 final ProcedureStore procStore = procExecutor.getStore(); 112 final int storeThreads = procExecutor.getCorePoolSize(); 113 final int execThreads = procExecutor.getCorePoolSize(); 114 115 final ProcedureExecutor.Testing testing = procExecutor.testing; 116 if (avoidTestKillDuringRestart) { 117 procExecutor.testing = null; 118 } 119 120 // stop 121 LOG.info("RESTART - Stop"); 122 procExecutor.stop(); 123 procStore.stop(abort); 124 if (stopAction != null) { 125 stopAction.call(); 126 } 127 procExecutor.join(); 128 procExecutor.getScheduler().clear(); 129 130 // nothing running... 131 132 // re-start 133 LOG.info("RESTART - Start"); 134 procStore.start(storeThreads); 135 procExecutor.init(execThreads, failOnCorrupted); 136 if (actionBeforeStartWorker != null) { 137 actionBeforeStartWorker.call(); 138 } 139 if (avoidTestKillDuringRestart) { 140 procExecutor.testing = testing; 141 } 142 if (startWorkers) { 143 procExecutor.startWorkers(); 144 } 145 if (startAction != null) { 146 startAction.call(); 147 } 148 } 149 150 public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader) 151 throws Exception { 152 storeRestart(procStore, false, loader); 153 } 154 155 public static void storeRestart(ProcedureStore procStore, boolean abort, 156 ProcedureStore.ProcedureLoader loader) throws Exception { 157 procStore.stop(abort); 158 procStore.start(procStore.getNumThreads()); 159 procStore.recoverLease(); 160 procStore.load(loader); 161 } 162 163 public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId, 164 long runnableCount, int completedCount, int corruptedCount) throws Exception { 165 final LoadCounter loader = new LoadCounter(); 166 storeRestart(procStore, loader); 167 assertEquals(maxProcId, loader.getMaxProcId()); 168 assertEquals(runnableCount, loader.getRunnableCount()); 169 assertEquals(completedCount, loader.getCompletedCount()); 170 assertEquals(corruptedCount, loader.getCorruptedCount()); 171 return loader; 172 } 173 174 private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) { 175 if (procExecutor.testing == null) { 176 procExecutor.testing = new ProcedureExecutor.Testing(); 177 } 178 } 179 180 public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor, 181 boolean value) { 182 createExecutorTesting(procExecutor); 183 procExecutor.testing.killIfHasParent = value; 184 } 185 186 public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor, 187 boolean value) { 188 createExecutorTesting(procExecutor); 189 procExecutor.testing.killIfSuspended = value; 190 } 191 192 public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 193 boolean value) { 194 createExecutorTesting(procExecutor); 195 procExecutor.testing.killBeforeStoreUpdate = value; 196 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 197 assertSingleExecutorForKillTests(procExecutor); 198 } 199 200 public static <TEnv> void setKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor, 201 boolean value) { 202 createExecutorTesting(procExecutor); 203 procExecutor.testing.killBeforeStoreUpdateInRollback = value; 204 LOG.warn("Set Kill before store update in rollback to: " 205 + procExecutor.testing.killBeforeStoreUpdateInRollback); 206 assertSingleExecutorForKillTests(procExecutor); 207 } 208 209 public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 210 boolean value) { 211 createExecutorTesting(procExecutor); 212 procExecutor.testing.toggleKillBeforeStoreUpdate = value; 213 assertSingleExecutorForKillTests(procExecutor); 214 } 215 216 public static <TEnv> void 217 setToggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor, boolean value) { 218 createExecutorTesting(procExecutor); 219 procExecutor.testing.toggleKillBeforeStoreUpdateInRollback = value; 220 assertSingleExecutorForKillTests(procExecutor); 221 } 222 223 public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 224 createExecutorTesting(procExecutor); 225 procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; 226 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 227 assertSingleExecutorForKillTests(procExecutor); 228 } 229 230 public static <TEnv> void 231 toggleKillBeforeStoreUpdateInRollback(ProcedureExecutor<TEnv> procExecutor) { 232 createExecutorTesting(procExecutor); 233 procExecutor.testing.killBeforeStoreUpdateInRollback = 234 !procExecutor.testing.killBeforeStoreUpdateInRollback; 235 LOG.warn("Set Kill before store update to in rollback: " 236 + procExecutor.testing.killBeforeStoreUpdateInRollback); 237 assertSingleExecutorForKillTests(procExecutor); 238 } 239 240 public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 241 createExecutorTesting(procExecutor); 242 procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate; 243 LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate); 244 assertSingleExecutorForKillTests(procExecutor); 245 } 246 247 public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 248 boolean value) { 249 setKillBeforeStoreUpdate(procExecutor, value); 250 setToggleKillBeforeStoreUpdate(procExecutor, value); 251 assertSingleExecutorForKillTests(procExecutor); 252 } 253 254 public static <TEnv> void setKillAndToggleBeforeStoreUpdateInRollback( 255 ProcedureExecutor<TEnv> procExecutor, boolean value) { 256 setKillBeforeStoreUpdateInRollback(procExecutor, value); 257 setToggleKillBeforeStoreUpdateInRollback(procExecutor, value); 258 assertSingleExecutorForKillTests(procExecutor); 259 } 260 261 private static <TEnv> void 262 assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) { 263 if (procExecutor.testing == null) { 264 return; 265 } 266 267 if ( 268 procExecutor.testing.killBeforeStoreUpdate || procExecutor.testing.toggleKillBeforeStoreUpdate 269 ) { 270 assertEquals(1, procExecutor.getCorePoolSize(), 271 "expected only one executor running during test with kill/restart"); 272 } 273 } 274 275 public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc) 276 throws IOException { 277 NoopProcedureStore procStore = new NoopProcedureStore(); 278 ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore); 279 procStore.start(1); 280 initAndStartWorkers(procExecutor, 1, false, true); 281 try { 282 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 283 } finally { 284 procStore.stop(false); 285 procExecutor.stop(); 286 } 287 } 288 289 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 290 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 291 } 292 293 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 294 final long nonceGroup, final long nonce) { 295 long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce); 296 waitProcedure(procExecutor, procId); 297 return procId; 298 } 299 300 public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 301 final long nonceGroup, final long nonce) { 302 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 303 long procId = procExecutor.registerNonce(nonceKey); 304 assertFalse(procId >= 0); 305 return procExecutor.submitProcedure(proc, nonceKey); 306 } 307 308 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 309 while (proc.getState() == ProcedureState.INITIALIZING) { 310 Threads.sleepWithoutInterrupt(250); 311 } 312 waitProcedure(procExecutor, proc.getProcId()); 313 } 314 315 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) { 316 while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) { 317 Threads.sleepWithoutInterrupt(250); 318 } 319 } 320 321 public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) { 322 for (int i = 0; i < procIds.length; ++i) { 323 waitProcedure(procExecutor, procIds[i]); 324 } 325 } 326 327 public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) { 328 for (long procId : procExecutor.getActiveProcIds()) { 329 waitProcedure(procExecutor, procId); 330 } 331 } 332 333 public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) { 334 int stableRuns = 0; 335 while (stableRuns < 10) { 336 if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) { 337 stableRuns = 0; 338 Threads.sleepWithoutInterrupt(100); 339 } else { 340 stableRuns++; 341 Threads.sleepWithoutInterrupt(25); 342 } 343 } 344 } 345 346 public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor, 347 long procId) { 348 assertFalse(procExecutor.isFinished(procId), "expected a running proc"); 349 assertNull(procExecutor.getResult(procId)); 350 } 351 352 public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) { 353 Procedure<?> result = procExecutor.getResult(procId); 354 assertNotNull(result, "expected procedure result"); 355 assertProcNotFailed(result); 356 } 357 358 public static void assertProcNotFailed(final Procedure<?> result) { 359 assertFalse(result.isFailed(), "found exception: " + result.getException()); 360 } 361 362 public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor, 363 final long procId) { 364 Procedure<?> result = procExecutor.getResult(procId); 365 assertNotNull(result, "expected procedure result"); 366 return assertProcFailed(result); 367 } 368 369 public static Throwable assertProcFailed(final Procedure<?> result) { 370 assertTrue(result.isFailed()); 371 LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage()); 372 return getExceptionCause(result); 373 } 374 375 public static void assertIsAbortException(final Procedure<?> result) { 376 Throwable cause = assertProcFailed(result); 377 assertInstanceOf(ProcedureAbortedException.class, cause, 378 "expected abort exception, got " + cause); 379 } 380 381 public static void assertIsTimeoutException(final Procedure<?> result) { 382 Throwable cause = assertProcFailed(result); 383 assertInstanceOf(TimeoutIOException.class, cause, "expected TimeoutIOException, got " + cause); 384 } 385 386 public static void assertIsIllegalArgumentException(final Procedure<?> result) { 387 Throwable cause = assertProcFailed(result); 388 assertInstanceOf(IllegalArgumentIOException.class, cause, 389 "expected IllegalArgumentIOException, got " + cause); 390 } 391 392 public static Throwable getExceptionCause(final Procedure<?> procInfo) { 393 assert procInfo.isFailed(); 394 Throwable cause = procInfo.getException().getCause(); 395 return cause == null ? procInfo.getException() : cause; 396 } 397 398 /** 399 * Run through all procedure flow states TWICE while also restarting procedure executor at each 400 * step; i.e force a reread of procedure store. 401 * <p> 402 * It does 403 * <ol> 404 * <li>Execute step N - kill the executor before store update 405 * <li>Restart executor/store 406 * <li>Execute step N - and then save to store 407 * </ol> 408 * <p> 409 * This is a good test for finding state that needs persisting and steps that are not idempotent. 410 */ 411 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 412 final long procId) throws Exception { 413 testRecoveryAndDoubleExecution(procExec, procId, false); 414 } 415 416 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 417 final long procId, final boolean expectFailure) throws Exception { 418 testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null); 419 } 420 421 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 422 final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception { 423 Procedure proc = procExec.getProcedure(procId); 424 waitProcedure(procExec, procId); 425 assertEquals(false, procExec.isRunning()); 426 for (int i = 0; !procExec.isFinished(procId); ++i) { 427 proc = procExec.getProcedure(procId); 428 LOG.info("Restart " + i + " exec state: " + proc); 429 if (customRestart != null) { 430 customRestart.run(); 431 } else { 432 restart(procExec); 433 } 434 waitProcedure(procExec, procId); 435 } 436 437 assertEquals(true, procExec.isRunning()); 438 if (expectFailure) { 439 assertProcFailed(procExec, procId); 440 } else { 441 assertProcNotFailed(procExec, procId); 442 } 443 } 444 445 public static class NoopProcedure<TEnv> extends Procedure<TEnv> { 446 public NoopProcedure() { 447 } 448 449 @Override 450 protected Procedure<TEnv>[] execute(TEnv env) 451 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 452 return null; 453 } 454 455 @Override 456 protected void rollback(TEnv env) throws IOException, InterruptedException { 457 } 458 459 @Override 460 protected boolean abort(TEnv env) { 461 return false; 462 } 463 464 @Override 465 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 466 } 467 468 @Override 469 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 470 } 471 } 472 473 public static class NoopStateMachineProcedure<TEnv, TState> 474 extends StateMachineProcedure<TEnv, TState> { 475 private TState initialState; 476 private TEnv env; 477 478 public NoopStateMachineProcedure() { 479 } 480 481 public NoopStateMachineProcedure(TEnv env, TState initialState) { 482 this.env = env; 483 this.initialState = initialState; 484 } 485 486 @Override 487 protected Flow executeFromState(TEnv env, TState tState) 488 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 489 return null; 490 } 491 492 @Override 493 protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException { 494 495 } 496 497 @Override 498 protected TState getState(int stateId) { 499 return null; 500 } 501 502 @Override 503 protected int getStateId(TState tState) { 504 return 0; 505 } 506 507 @Override 508 protected TState getInitialState() { 509 return initialState; 510 } 511 } 512 513 public static class TestProcedure extends NoopProcedure<Void> { 514 private byte[] data = null; 515 516 public TestProcedure() { 517 } 518 519 public TestProcedure(long procId) { 520 this(procId, 0); 521 } 522 523 public TestProcedure(long procId, long parentId) { 524 this(procId, parentId, null); 525 } 526 527 public TestProcedure(long procId, long parentId, byte[] data) { 528 this(procId, parentId, parentId, data); 529 } 530 531 public TestProcedure(long procId, long parentId, long rootId, byte[] data) { 532 setData(data); 533 setProcId(procId); 534 if (parentId > 0) { 535 setParentProcId(parentId); 536 } 537 if (rootId > 0 || parentId > 0) { 538 setRootProcId(rootId); 539 } 540 } 541 542 public void addStackId(final int index) { 543 addStackIndex(index); 544 } 545 546 public void setSuccessState() { 547 setState(ProcedureState.SUCCESS); 548 } 549 550 public void setData(final byte[] data) { 551 this.data = data; 552 } 553 554 @Override 555 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 556 ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data); 557 BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString); 558 serializer.serialize(builder.build()); 559 } 560 561 @Override 562 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 563 BytesValue bytesValue = serializer.deserialize(BytesValue.class); 564 ByteString dataString = bytesValue.getValue(); 565 566 if (dataString.isEmpty()) { 567 data = null; 568 } else { 569 data = dataString.toByteArray(); 570 } 571 } 572 573 // Mark acquire/release lock functions public for test uses. 574 @Override 575 public LockState acquireLock(Void env) { 576 return LockState.LOCK_ACQUIRED; 577 } 578 579 @Override 580 public void releaseLock(Void env) { 581 // no-op 582 } 583 } 584 585 public static class LoadCounter implements ProcedureStore.ProcedureLoader { 586 private final ArrayList<Procedure> corrupted = new ArrayList<>(); 587 private final ArrayList<Procedure> completed = new ArrayList<>(); 588 private final ArrayList<Procedure> runnable = new ArrayList<>(); 589 590 private Set<Long> procIds; 591 private long maxProcId = 0; 592 593 public LoadCounter() { 594 this(null); 595 } 596 597 public LoadCounter(final Set<Long> procIds) { 598 this.procIds = procIds; 599 } 600 601 public void reset() { 602 reset(null); 603 } 604 605 public void reset(final Set<Long> procIds) { 606 corrupted.clear(); 607 completed.clear(); 608 runnable.clear(); 609 this.procIds = procIds; 610 this.maxProcId = 0; 611 } 612 613 public long getMaxProcId() { 614 return maxProcId; 615 } 616 617 public ArrayList<Procedure> getRunnables() { 618 return runnable; 619 } 620 621 public int getRunnableCount() { 622 return runnable.size(); 623 } 624 625 public ArrayList<Procedure> getCompleted() { 626 return completed; 627 } 628 629 public int getCompletedCount() { 630 return completed.size(); 631 } 632 633 public int getLoadedCount() { 634 return runnable.size() + completed.size(); 635 } 636 637 public ArrayList<Procedure> getCorrupted() { 638 return corrupted; 639 } 640 641 public int getCorruptedCount() { 642 return corrupted.size(); 643 } 644 645 public boolean isRunnable(final long procId) { 646 for (Procedure proc : runnable) { 647 if (proc.getProcId() == procId) { 648 return true; 649 } 650 } 651 return false; 652 } 653 654 @Override 655 public void setMaxProcId(long maxProcId) { 656 this.maxProcId = maxProcId; 657 } 658 659 @Override 660 public void load(ProcedureIterator procIter) throws IOException { 661 while (procIter.hasNext()) { 662 long procId; 663 if (procIter.isNextFinished()) { 664 Procedure<?> proc = procIter.next(); 665 procId = proc.getProcId(); 666 LOG.debug("loading completed procId=" + procId + ": " + proc); 667 completed.add(proc); 668 } else { 669 Procedure proc = procIter.next(); 670 procId = proc.getProcId(); 671 LOG.debug("loading runnable procId=" + procId + ": " + proc); 672 runnable.add(proc); 673 } 674 if (procIds != null) { 675 assertTrue(procIds.contains(procId), "procId=" + procId + " unexpected"); 676 } 677 } 678 } 679 680 @Override 681 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 682 while (procIter.hasNext()) { 683 Procedure proc = procIter.next(); 684 LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc); 685 corrupted.add(proc); 686 } 687 } 688 } 689}