001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 033import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 034import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 035import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 037import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 038import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 039import org.apache.hadoop.hbase.util.NonceKey; 040import org.apache.hadoop.hbase.util.Threads; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 045import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 048 049public final class ProcedureTestingUtility { 050 private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class); 051 052 private ProcedureTestingUtility() { 053 } 054 055 public static ProcedureStore createStore(final Configuration conf, final Path dir) 056 throws IOException { 057 return createWalStore(conf, dir); 058 } 059 060 public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) 061 throws IOException { 062 return new WALProcedureStore(conf, dir, null, new LeaseRecovery() { 063 @Override 064 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 065 // no-op 066 } 067 }); 068 } 069 070 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort, 071 boolean startWorkers) throws Exception { 072 restart(procExecutor, false, true, null, null, null, abort, startWorkers); 073 } 074 075 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort) 076 throws Exception { 077 restart(procExecutor, false, true, null, null, null, abort, true); 078 } 079 080 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception { 081 restart(procExecutor, false, true, null, null, null, false, true); 082 } 083 084 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 085 boolean abortOnCorruption) throws IOException { 086 initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true); 087 } 088 089 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 090 boolean abortOnCorruption, boolean startWorkers) throws IOException { 091 procExecutor.init(numThreads, abortOnCorruption); 092 if (startWorkers) { 093 procExecutor.startWorkers(); 094 } 095 } 096 097 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 098 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 099 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception { 100 restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction, 101 actionBeforeStartWorker, startAction, false, true); 102 } 103 104 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 105 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 106 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort, 107 boolean startWorkers) throws Exception { 108 final ProcedureStore procStore = procExecutor.getStore(); 109 final int storeThreads = procExecutor.getCorePoolSize(); 110 final int execThreads = procExecutor.getCorePoolSize(); 111 112 final ProcedureExecutor.Testing testing = procExecutor.testing; 113 if (avoidTestKillDuringRestart) { 114 procExecutor.testing = null; 115 } 116 117 // stop 118 LOG.info("RESTART - Stop"); 119 procExecutor.stop(); 120 procStore.stop(abort); 121 if (stopAction != null) { 122 stopAction.call(); 123 } 124 procExecutor.join(); 125 procExecutor.getScheduler().clear(); 126 127 // nothing running... 128 129 // re-start 130 LOG.info("RESTART - Start"); 131 procStore.start(storeThreads); 132 procExecutor.init(execThreads, failOnCorrupted); 133 if (actionBeforeStartWorker != null) { 134 actionBeforeStartWorker.call(); 135 } 136 if (avoidTestKillDuringRestart) { 137 procExecutor.testing = testing; 138 } 139 if (startWorkers) { 140 procExecutor.startWorkers(); 141 } 142 if (startAction != null) { 143 startAction.call(); 144 } 145 } 146 147 public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader) 148 throws Exception { 149 storeRestart(procStore, false, loader); 150 } 151 152 public static void storeRestart(ProcedureStore procStore, boolean abort, 153 ProcedureStore.ProcedureLoader loader) throws Exception { 154 procStore.stop(abort); 155 procStore.start(procStore.getNumThreads()); 156 procStore.recoverLease(); 157 procStore.load(loader); 158 } 159 160 public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId, 161 long runnableCount, int completedCount, int corruptedCount) throws Exception { 162 final LoadCounter loader = new LoadCounter(); 163 storeRestart(procStore, loader); 164 assertEquals(maxProcId, loader.getMaxProcId()); 165 assertEquals(runnableCount, loader.getRunnableCount()); 166 assertEquals(completedCount, loader.getCompletedCount()); 167 assertEquals(corruptedCount, loader.getCorruptedCount()); 168 return loader; 169 } 170 171 private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) { 172 if (procExecutor.testing == null) { 173 procExecutor.testing = new ProcedureExecutor.Testing(); 174 } 175 } 176 177 public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor, 178 boolean value) { 179 createExecutorTesting(procExecutor); 180 procExecutor.testing.killIfHasParent = value; 181 } 182 183 public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor, 184 boolean value) { 185 createExecutorTesting(procExecutor); 186 procExecutor.testing.killIfSuspended = value; 187 } 188 189 public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 190 boolean value) { 191 createExecutorTesting(procExecutor); 192 procExecutor.testing.killBeforeStoreUpdate = value; 193 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 194 assertSingleExecutorForKillTests(procExecutor); 195 } 196 197 public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 198 boolean value) { 199 createExecutorTesting(procExecutor); 200 procExecutor.testing.toggleKillBeforeStoreUpdate = value; 201 assertSingleExecutorForKillTests(procExecutor); 202 } 203 204 public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 205 createExecutorTesting(procExecutor); 206 procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; 207 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 208 assertSingleExecutorForKillTests(procExecutor); 209 } 210 211 public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 212 createExecutorTesting(procExecutor); 213 procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate; 214 LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate); 215 assertSingleExecutorForKillTests(procExecutor); 216 } 217 218 public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 219 boolean value) { 220 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value); 221 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value); 222 assertSingleExecutorForKillTests(procExecutor); 223 } 224 225 private static <TEnv> void 226 assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) { 227 if (procExecutor.testing == null) { 228 return; 229 } 230 231 if (procExecutor.testing.killBeforeStoreUpdate || 232 procExecutor.testing.toggleKillBeforeStoreUpdate) { 233 assertEquals("expected only one executor running during test with kill/restart", 1, 234 procExecutor.getCorePoolSize()); 235 } 236 } 237 238 public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc) 239 throws IOException { 240 NoopProcedureStore procStore = new NoopProcedureStore(); 241 ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore); 242 procStore.start(1); 243 initAndStartWorkers(procExecutor, 1, false, true); 244 try { 245 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 246 } finally { 247 procStore.stop(false); 248 procExecutor.stop(); 249 } 250 } 251 252 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 253 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 254 } 255 256 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 257 final long nonceGroup, final long nonce) { 258 long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce); 259 waitProcedure(procExecutor, procId); 260 return procId; 261 } 262 263 public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 264 final long nonceGroup, final long nonce) { 265 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 266 long procId = procExecutor.registerNonce(nonceKey); 267 assertFalse(procId >= 0); 268 return procExecutor.submitProcedure(proc, nonceKey); 269 } 270 271 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 272 while (proc.getState() == ProcedureState.INITIALIZING) { 273 Threads.sleepWithoutInterrupt(250); 274 } 275 waitProcedure(procExecutor, proc.getProcId()); 276 } 277 278 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) { 279 while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) { 280 Threads.sleepWithoutInterrupt(250); 281 } 282 } 283 284 public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) { 285 for (int i = 0; i < procIds.length; ++i) { 286 waitProcedure(procExecutor, procIds[i]); 287 } 288 } 289 290 public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) { 291 for (long procId : procExecutor.getActiveProcIds()) { 292 waitProcedure(procExecutor, procId); 293 } 294 } 295 296 public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) { 297 int stableRuns = 0; 298 while (stableRuns < 10) { 299 if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) { 300 stableRuns = 0; 301 Threads.sleepWithoutInterrupt(100); 302 } else { 303 stableRuns++; 304 Threads.sleepWithoutInterrupt(25); 305 } 306 } 307 } 308 309 public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor, 310 long procId) { 311 assertFalse("expected a running proc", procExecutor.isFinished(procId)); 312 assertEquals(null, procExecutor.getResult(procId)); 313 } 314 315 public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) { 316 Procedure<?> result = procExecutor.getResult(procId); 317 assertTrue("expected procedure result", result != null); 318 assertProcNotFailed(result); 319 } 320 321 public static void assertProcNotFailed(final Procedure<?> result) { 322 assertFalse("found exception: " + result.getException(), result.isFailed()); 323 } 324 325 public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor, 326 final long procId) { 327 Procedure<?> result = procExecutor.getResult(procId); 328 assertTrue("expected procedure result", result != null); 329 return assertProcFailed(result); 330 } 331 332 public static Throwable assertProcFailed(final Procedure<?> result) { 333 assertEquals(true, result.isFailed()); 334 LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage()); 335 return getExceptionCause(result); 336 } 337 338 public static void assertIsAbortException(final Procedure<?> result) { 339 Throwable cause = assertProcFailed(result); 340 assertTrue("expected abort exception, got " + cause, 341 cause instanceof ProcedureAbortedException); 342 } 343 344 public static void assertIsTimeoutException(final Procedure<?> result) { 345 Throwable cause = assertProcFailed(result); 346 assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException); 347 } 348 349 public static void assertIsIllegalArgumentException(final Procedure<?> result) { 350 Throwable cause = assertProcFailed(result); 351 assertTrue("expected IllegalArgumentIOException, got " + cause, 352 cause instanceof IllegalArgumentIOException); 353 } 354 355 public static Throwable getExceptionCause(final Procedure<?> procInfo) { 356 assert procInfo.isFailed(); 357 Throwable cause = procInfo.getException().getCause(); 358 return cause == null ? procInfo.getException() : cause; 359 } 360 361 /** 362 * Run through all procedure flow states TWICE while also restarting procedure executor at each 363 * step; i.e force a reread of procedure store. 364 * <p> 365 * It does 366 * <ol> 367 * <li>Execute step N - kill the executor before store update 368 * <li>Restart executor/store 369 * <li>Execute step N - and then save to store 370 * </ol> 371 * <p> 372 * This is a good test for finding state that needs persisting and steps that are not idempotent. 373 */ 374 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 375 final long procId) throws Exception { 376 testRecoveryAndDoubleExecution(procExec, procId, false); 377 } 378 379 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 380 final long procId, final boolean expectFailure) throws Exception { 381 testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null); 382 } 383 384 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 385 final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception { 386 Procedure proc = procExec.getProcedure(procId); 387 waitProcedure(procExec, procId); 388 assertEquals(false, procExec.isRunning()); 389 for (int i = 0; !procExec.isFinished(procId); ++i) { 390 proc = procExec.getProcedure(procId); 391 LOG.info("Restart " + i + " exec state: " + proc); 392 if (customRestart != null) { 393 customRestart.run(); 394 } else { 395 restart(procExec); 396 } 397 waitProcedure(procExec, procId); 398 } 399 400 assertEquals(true, procExec.isRunning()); 401 if (expectFailure) { 402 assertProcFailed(procExec, procId); 403 } else { 404 assertProcNotFailed(procExec, procId); 405 } 406 } 407 408 public static class NoopProcedure<TEnv> extends Procedure<TEnv> { 409 public NoopProcedure() { 410 } 411 412 @Override 413 protected Procedure<TEnv>[] execute(TEnv env) 414 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 415 return null; 416 } 417 418 @Override 419 protected void rollback(TEnv env) throws IOException, InterruptedException { 420 } 421 422 @Override 423 protected boolean abort(TEnv env) { 424 return false; 425 } 426 427 @Override 428 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 429 } 430 431 @Override 432 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 433 } 434 } 435 436 public static class NoopStateMachineProcedure<TEnv, TState> 437 extends StateMachineProcedure<TEnv, TState> { 438 private TState initialState; 439 private TEnv env; 440 441 public NoopStateMachineProcedure() { 442 } 443 444 public NoopStateMachineProcedure(TEnv env, TState initialState) { 445 this.env = env; 446 this.initialState = initialState; 447 } 448 449 @Override 450 protected Flow executeFromState(TEnv env, TState tState) 451 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 452 return null; 453 } 454 455 @Override 456 protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException { 457 458 } 459 460 @Override 461 protected TState getState(int stateId) { 462 return null; 463 } 464 465 @Override 466 protected int getStateId(TState tState) { 467 return 0; 468 } 469 470 @Override 471 protected TState getInitialState() { 472 return initialState; 473 } 474 } 475 476 public static class TestProcedure extends NoopProcedure<Void> { 477 private byte[] data = null; 478 479 public TestProcedure() { 480 } 481 482 public TestProcedure(long procId) { 483 this(procId, 0); 484 } 485 486 public TestProcedure(long procId, long parentId) { 487 this(procId, parentId, null); 488 } 489 490 public TestProcedure(long procId, long parentId, byte[] data) { 491 this(procId, parentId, parentId, data); 492 } 493 494 public TestProcedure(long procId, long parentId, long rootId, byte[] data) { 495 setData(data); 496 setProcId(procId); 497 if (parentId > 0) { 498 setParentProcId(parentId); 499 } 500 if (rootId > 0 || parentId > 0) { 501 setRootProcId(rootId); 502 } 503 } 504 505 public void addStackId(final int index) { 506 addStackIndex(index); 507 } 508 509 public void setSuccessState() { 510 setState(ProcedureState.SUCCESS); 511 } 512 513 public void setData(final byte[] data) { 514 this.data = data; 515 } 516 517 @Override 518 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 519 ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data); 520 BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString); 521 serializer.serialize(builder.build()); 522 } 523 524 @Override 525 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 526 BytesValue bytesValue = serializer.deserialize(BytesValue.class); 527 ByteString dataString = bytesValue.getValue(); 528 529 if (dataString.isEmpty()) { 530 data = null; 531 } else { 532 data = dataString.toByteArray(); 533 } 534 } 535 536 // Mark acquire/release lock functions public for test uses. 537 @Override 538 public LockState acquireLock(Void env) { 539 return LockState.LOCK_ACQUIRED; 540 } 541 542 @Override 543 public void releaseLock(Void env) { 544 // no-op 545 } 546 } 547 548 public static class LoadCounter implements ProcedureStore.ProcedureLoader { 549 private final ArrayList<Procedure> corrupted = new ArrayList<>(); 550 private final ArrayList<Procedure> completed = new ArrayList<>(); 551 private final ArrayList<Procedure> runnable = new ArrayList<>(); 552 553 private Set<Long> procIds; 554 private long maxProcId = 0; 555 556 public LoadCounter() { 557 this(null); 558 } 559 560 public LoadCounter(final Set<Long> procIds) { 561 this.procIds = procIds; 562 } 563 564 public void reset() { 565 reset(null); 566 } 567 568 public void reset(final Set<Long> procIds) { 569 corrupted.clear(); 570 completed.clear(); 571 runnable.clear(); 572 this.procIds = procIds; 573 this.maxProcId = 0; 574 } 575 576 public long getMaxProcId() { 577 return maxProcId; 578 } 579 580 public ArrayList<Procedure> getRunnables() { 581 return runnable; 582 } 583 584 public int getRunnableCount() { 585 return runnable.size(); 586 } 587 588 public ArrayList<Procedure> getCompleted() { 589 return completed; 590 } 591 592 public int getCompletedCount() { 593 return completed.size(); 594 } 595 596 public int getLoadedCount() { 597 return runnable.size() + completed.size(); 598 } 599 600 public ArrayList<Procedure> getCorrupted() { 601 return corrupted; 602 } 603 604 public int getCorruptedCount() { 605 return corrupted.size(); 606 } 607 608 public boolean isRunnable(final long procId) { 609 for (Procedure proc : runnable) { 610 if (proc.getProcId() == procId) { 611 return true; 612 } 613 } 614 return false; 615 } 616 617 @Override 618 public void setMaxProcId(long maxProcId) { 619 this.maxProcId = maxProcId; 620 } 621 622 @Override 623 public void load(ProcedureIterator procIter) throws IOException { 624 while (procIter.hasNext()) { 625 long procId; 626 if (procIter.isNextFinished()) { 627 Procedure<?> proc = procIter.next(); 628 procId = proc.getProcId(); 629 LOG.debug("loading completed procId=" + procId + ": " + proc); 630 completed.add(proc); 631 } else { 632 Procedure proc = procIter.next(); 633 procId = proc.getProcId(); 634 LOG.debug("loading runnable procId=" + procId + ": " + proc); 635 runnable.add(proc); 636 } 637 if (procIds != null) { 638 assertTrue("procId=" + procId + " unexpected", procIds.contains(procId)); 639 } 640 } 641 } 642 643 @Override 644 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 645 while (procIter.hasNext()) { 646 Procedure proc = procIter.next(); 647 LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc); 648 corrupted.add(proc); 649 } 650 } 651 } 652}