001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 033import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 034import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; 035import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 037import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 038import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 039import org.apache.hadoop.hbase.util.NonceKey; 040import org.apache.hadoop.hbase.util.Threads; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 045import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 048 049public final class ProcedureTestingUtility { 050 private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class); 051 052 private ProcedureTestingUtility() { 053 } 054 055 public static ProcedureStore createStore(final Configuration conf, final Path dir) 056 throws IOException { 057 return createWalStore(conf, dir); 058 } 059 060 public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) 061 throws IOException { 062 return new WALProcedureStore(conf, dir, null, new LeaseRecovery() { 063 @Override 064 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 065 // no-op 066 } 067 }); 068 } 069 070 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort, 071 boolean startWorkers) throws Exception { 072 restart(procExecutor, false, true, null, null, null, abort, startWorkers); 073 } 074 075 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort) 076 throws Exception { 077 restart(procExecutor, false, true, null, null, null, abort, true); 078 } 079 080 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception { 081 restart(procExecutor, false, true, null, null, null, false, true); 082 } 083 084 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 085 boolean abortOnCorruption) throws IOException { 086 initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true); 087 } 088 089 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 090 boolean abortOnCorruption, boolean startWorkers) throws IOException { 091 procExecutor.init(numThreads, abortOnCorruption); 092 if (startWorkers) { 093 procExecutor.startWorkers(); 094 } 095 } 096 097 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 098 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 099 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception { 100 restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction, 101 actionBeforeStartWorker, startAction, false, true); 102 } 103 104 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 105 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 106 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort, 107 boolean startWorkers) throws Exception { 108 final ProcedureStore procStore = procExecutor.getStore(); 109 final int storeThreads = procExecutor.getCorePoolSize(); 110 final int execThreads = procExecutor.getCorePoolSize(); 111 112 final ProcedureExecutor.Testing testing = procExecutor.testing; 113 if (avoidTestKillDuringRestart) { 114 procExecutor.testing = null; 115 } 116 117 // stop 118 LOG.info("RESTART - Stop"); 119 procExecutor.stop(); 120 procStore.stop(abort); 121 if (stopAction != null) { 122 stopAction.call(); 123 } 124 procExecutor.join(); 125 procExecutor.getScheduler().clear(); 126 127 // nothing running... 128 129 // re-start 130 LOG.info("RESTART - Start"); 131 procStore.start(storeThreads); 132 procExecutor.init(execThreads, failOnCorrupted); 133 if (actionBeforeStartWorker != null) { 134 actionBeforeStartWorker.call(); 135 } 136 if (avoidTestKillDuringRestart) { 137 procExecutor.testing = testing; 138 } 139 if (startWorkers) { 140 procExecutor.startWorkers(); 141 } 142 if (startAction != null) { 143 startAction.call(); 144 } 145 } 146 147 public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader) 148 throws Exception { 149 storeRestart(procStore, false, loader); 150 } 151 152 public static void storeRestart(ProcedureStore procStore, boolean abort, 153 ProcedureStore.ProcedureLoader loader) throws Exception { 154 procStore.stop(abort); 155 procStore.start(procStore.getNumThreads()); 156 procStore.recoverLease(); 157 procStore.load(loader); 158 } 159 160 public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId, 161 long runnableCount, int completedCount, int corruptedCount) throws Exception { 162 final LoadCounter loader = new LoadCounter(); 163 storeRestart(procStore, loader); 164 assertEquals(maxProcId, loader.getMaxProcId()); 165 assertEquals(runnableCount, loader.getRunnableCount()); 166 assertEquals(completedCount, loader.getCompletedCount()); 167 assertEquals(corruptedCount, loader.getCorruptedCount()); 168 return loader; 169 } 170 171 private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) { 172 if (procExecutor.testing == null) { 173 procExecutor.testing = new ProcedureExecutor.Testing(); 174 } 175 } 176 177 public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor, 178 boolean value) { 179 createExecutorTesting(procExecutor); 180 procExecutor.testing.killIfHasParent = value; 181 } 182 183 public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor, 184 boolean value) { 185 createExecutorTesting(procExecutor); 186 procExecutor.testing.killIfSuspended = value; 187 } 188 189 public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 190 boolean value) { 191 createExecutorTesting(procExecutor); 192 procExecutor.testing.killBeforeStoreUpdate = value; 193 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 194 assertSingleExecutorForKillTests(procExecutor); 195 } 196 197 public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 198 boolean value) { 199 createExecutorTesting(procExecutor); 200 procExecutor.testing.toggleKillBeforeStoreUpdate = value; 201 assertSingleExecutorForKillTests(procExecutor); 202 } 203 204 public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 205 createExecutorTesting(procExecutor); 206 procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; 207 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 208 assertSingleExecutorForKillTests(procExecutor); 209 } 210 211 public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 212 createExecutorTesting(procExecutor); 213 procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate; 214 LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate); 215 assertSingleExecutorForKillTests(procExecutor); 216 } 217 218 public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 219 boolean value) { 220 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value); 221 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value); 222 assertSingleExecutorForKillTests(procExecutor); 223 } 224 225 private static <TEnv> void 226 assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) { 227 if (procExecutor.testing == null) { 228 return; 229 } 230 231 if ( 232 procExecutor.testing.killBeforeStoreUpdate || procExecutor.testing.toggleKillBeforeStoreUpdate 233 ) { 234 assertEquals("expected only one executor running during test with kill/restart", 1, 235 procExecutor.getCorePoolSize()); 236 } 237 } 238 239 public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc) 240 throws IOException { 241 NoopProcedureStore procStore = new NoopProcedureStore(); 242 ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore); 243 procStore.start(1); 244 initAndStartWorkers(procExecutor, 1, false, true); 245 try { 246 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 247 } finally { 248 procStore.stop(false); 249 procExecutor.stop(); 250 } 251 } 252 253 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 254 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 255 } 256 257 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 258 final long nonceGroup, final long nonce) { 259 long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce); 260 waitProcedure(procExecutor, procId); 261 return procId; 262 } 263 264 public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 265 final long nonceGroup, final long nonce) { 266 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 267 long procId = procExecutor.registerNonce(nonceKey); 268 assertFalse(procId >= 0); 269 return procExecutor.submitProcedure(proc, nonceKey); 270 } 271 272 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 273 while (proc.getState() == ProcedureState.INITIALIZING) { 274 Threads.sleepWithoutInterrupt(250); 275 } 276 waitProcedure(procExecutor, proc.getProcId()); 277 } 278 279 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) { 280 while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) { 281 Threads.sleepWithoutInterrupt(250); 282 } 283 } 284 285 public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) { 286 for (int i = 0; i < procIds.length; ++i) { 287 waitProcedure(procExecutor, procIds[i]); 288 } 289 } 290 291 public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) { 292 for (long procId : procExecutor.getActiveProcIds()) { 293 waitProcedure(procExecutor, procId); 294 } 295 } 296 297 public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) { 298 int stableRuns = 0; 299 while (stableRuns < 10) { 300 if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) { 301 stableRuns = 0; 302 Threads.sleepWithoutInterrupt(100); 303 } else { 304 stableRuns++; 305 Threads.sleepWithoutInterrupt(25); 306 } 307 } 308 } 309 310 public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor, 311 long procId) { 312 assertFalse("expected a running proc", procExecutor.isFinished(procId)); 313 assertEquals(null, procExecutor.getResult(procId)); 314 } 315 316 public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) { 317 Procedure<?> result = procExecutor.getResult(procId); 318 assertTrue("expected procedure result", result != null); 319 assertProcNotFailed(result); 320 } 321 322 public static void assertProcNotFailed(final Procedure<?> result) { 323 assertFalse("found exception: " + result.getException(), result.isFailed()); 324 } 325 326 public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor, 327 final long procId) { 328 Procedure<?> result = procExecutor.getResult(procId); 329 assertTrue("expected procedure result", result != null); 330 return assertProcFailed(result); 331 } 332 333 public static Throwable assertProcFailed(final Procedure<?> result) { 334 assertEquals(true, result.isFailed()); 335 LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage()); 336 return getExceptionCause(result); 337 } 338 339 public static void assertIsAbortException(final Procedure<?> result) { 340 Throwable cause = assertProcFailed(result); 341 assertTrue("expected abort exception, got " + cause, 342 cause instanceof ProcedureAbortedException); 343 } 344 345 public static void assertIsTimeoutException(final Procedure<?> result) { 346 Throwable cause = assertProcFailed(result); 347 assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException); 348 } 349 350 public static void assertIsIllegalArgumentException(final Procedure<?> result) { 351 Throwable cause = assertProcFailed(result); 352 assertTrue("expected IllegalArgumentIOException, got " + cause, 353 cause instanceof IllegalArgumentIOException); 354 } 355 356 public static Throwable getExceptionCause(final Procedure<?> procInfo) { 357 assert procInfo.isFailed(); 358 Throwable cause = procInfo.getException().getCause(); 359 return cause == null ? procInfo.getException() : cause; 360 } 361 362 /** 363 * Run through all procedure flow states TWICE while also restarting procedure executor at each 364 * step; i.e force a reread of procedure store. 365 * <p> 366 * It does 367 * <ol> 368 * <li>Execute step N - kill the executor before store update 369 * <li>Restart executor/store 370 * <li>Execute step N - and then save to store 371 * </ol> 372 * <p> 373 * This is a good test for finding state that needs persisting and steps that are not idempotent. 374 */ 375 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 376 final long procId) throws Exception { 377 testRecoveryAndDoubleExecution(procExec, procId, false); 378 } 379 380 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 381 final long procId, final boolean expectFailure) throws Exception { 382 testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null); 383 } 384 385 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 386 final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception { 387 Procedure proc = procExec.getProcedure(procId); 388 waitProcedure(procExec, procId); 389 assertEquals(false, procExec.isRunning()); 390 for (int i = 0; !procExec.isFinished(procId); ++i) { 391 proc = procExec.getProcedure(procId); 392 LOG.info("Restart " + i + " exec state: " + proc); 393 if (customRestart != null) { 394 customRestart.run(); 395 } else { 396 restart(procExec); 397 } 398 waitProcedure(procExec, procId); 399 } 400 401 assertEquals(true, procExec.isRunning()); 402 if (expectFailure) { 403 assertProcFailed(procExec, procId); 404 } else { 405 assertProcNotFailed(procExec, procId); 406 } 407 } 408 409 public static class NoopProcedure<TEnv> extends Procedure<TEnv> { 410 public NoopProcedure() { 411 } 412 413 @Override 414 protected Procedure<TEnv>[] execute(TEnv env) 415 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 416 return null; 417 } 418 419 @Override 420 protected void rollback(TEnv env) throws IOException, InterruptedException { 421 } 422 423 @Override 424 protected boolean abort(TEnv env) { 425 return false; 426 } 427 428 @Override 429 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 430 } 431 432 @Override 433 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 434 } 435 } 436 437 public static class NoopStateMachineProcedure<TEnv, TState> 438 extends StateMachineProcedure<TEnv, TState> { 439 private TState initialState; 440 private TEnv env; 441 442 public NoopStateMachineProcedure() { 443 } 444 445 public NoopStateMachineProcedure(TEnv env, TState initialState) { 446 this.env = env; 447 this.initialState = initialState; 448 } 449 450 @Override 451 protected Flow executeFromState(TEnv env, TState tState) 452 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 453 return null; 454 } 455 456 @Override 457 protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException { 458 459 } 460 461 @Override 462 protected TState getState(int stateId) { 463 return null; 464 } 465 466 @Override 467 protected int getStateId(TState tState) { 468 return 0; 469 } 470 471 @Override 472 protected TState getInitialState() { 473 return initialState; 474 } 475 } 476 477 public static class TestProcedure extends NoopProcedure<Void> { 478 private byte[] data = null; 479 480 public TestProcedure() { 481 } 482 483 public TestProcedure(long procId) { 484 this(procId, 0); 485 } 486 487 public TestProcedure(long procId, long parentId) { 488 this(procId, parentId, null); 489 } 490 491 public TestProcedure(long procId, long parentId, byte[] data) { 492 this(procId, parentId, parentId, data); 493 } 494 495 public TestProcedure(long procId, long parentId, long rootId, byte[] data) { 496 setData(data); 497 setProcId(procId); 498 if (parentId > 0) { 499 setParentProcId(parentId); 500 } 501 if (rootId > 0 || parentId > 0) { 502 setRootProcId(rootId); 503 } 504 } 505 506 public void addStackId(final int index) { 507 addStackIndex(index); 508 } 509 510 public void setSuccessState() { 511 setState(ProcedureState.SUCCESS); 512 } 513 514 public void setData(final byte[] data) { 515 this.data = data; 516 } 517 518 @Override 519 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 520 ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data); 521 BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString); 522 serializer.serialize(builder.build()); 523 } 524 525 @Override 526 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 527 BytesValue bytesValue = serializer.deserialize(BytesValue.class); 528 ByteString dataString = bytesValue.getValue(); 529 530 if (dataString.isEmpty()) { 531 data = null; 532 } else { 533 data = dataString.toByteArray(); 534 } 535 } 536 537 // Mark acquire/release lock functions public for test uses. 538 @Override 539 public LockState acquireLock(Void env) { 540 return LockState.LOCK_ACQUIRED; 541 } 542 543 @Override 544 public void releaseLock(Void env) { 545 // no-op 546 } 547 } 548 549 public static class LoadCounter implements ProcedureStore.ProcedureLoader { 550 private final ArrayList<Procedure> corrupted = new ArrayList<>(); 551 private final ArrayList<Procedure> completed = new ArrayList<>(); 552 private final ArrayList<Procedure> runnable = new ArrayList<>(); 553 554 private Set<Long> procIds; 555 private long maxProcId = 0; 556 557 public LoadCounter() { 558 this(null); 559 } 560 561 public LoadCounter(final Set<Long> procIds) { 562 this.procIds = procIds; 563 } 564 565 public void reset() { 566 reset(null); 567 } 568 569 public void reset(final Set<Long> procIds) { 570 corrupted.clear(); 571 completed.clear(); 572 runnable.clear(); 573 this.procIds = procIds; 574 this.maxProcId = 0; 575 } 576 577 public long getMaxProcId() { 578 return maxProcId; 579 } 580 581 public ArrayList<Procedure> getRunnables() { 582 return runnable; 583 } 584 585 public int getRunnableCount() { 586 return runnable.size(); 587 } 588 589 public ArrayList<Procedure> getCompleted() { 590 return completed; 591 } 592 593 public int getCompletedCount() { 594 return completed.size(); 595 } 596 597 public int getLoadedCount() { 598 return runnable.size() + completed.size(); 599 } 600 601 public ArrayList<Procedure> getCorrupted() { 602 return corrupted; 603 } 604 605 public int getCorruptedCount() { 606 return corrupted.size(); 607 } 608 609 public boolean isRunnable(final long procId) { 610 for (Procedure proc : runnable) { 611 if (proc.getProcId() == procId) { 612 return true; 613 } 614 } 615 return false; 616 } 617 618 @Override 619 public void setMaxProcId(long maxProcId) { 620 this.maxProcId = maxProcId; 621 } 622 623 @Override 624 public void load(ProcedureIterator procIter) throws IOException { 625 while (procIter.hasNext()) { 626 long procId; 627 if (procIter.isNextFinished()) { 628 Procedure<?> proc = procIter.next(); 629 procId = proc.getProcId(); 630 LOG.debug("loading completed procId=" + procId + ": " + proc); 631 completed.add(proc); 632 } else { 633 Procedure proc = procIter.next(); 634 procId = proc.getProcId(); 635 LOG.debug("loading runnable procId=" + procId + ": " + proc); 636 runnable.add(proc); 637 } 638 if (procIds != null) { 639 assertTrue("procId=" + procId + " unexpected", procIds.contains(procId)); 640 } 641 } 642 } 643 644 @Override 645 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 646 while (procIter.hasNext()) { 647 Procedure proc = procIter.next(); 648 LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc); 649 corrupted.add(proc); 650 } 651 } 652 } 653}