001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Set; 027import java.util.concurrent.Callable; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 033import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 034import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 035import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 036import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 037import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 038import org.apache.hadoop.hbase.util.NonceKey; 039import org.apache.hadoop.hbase.util.Threads; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 044import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 047 048public final class ProcedureTestingUtility { 049 private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class); 050 051 private ProcedureTestingUtility() { 052 } 053 054 public static ProcedureStore createStore(final Configuration conf, final Path dir) 055 throws IOException { 056 return createWalStore(conf, dir); 057 } 058 059 public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) 060 throws IOException { 061 return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() { 062 @Override 063 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 064 // no-op 065 } 066 }); 067 } 068 069 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, 070 boolean abort, boolean startWorkers) throws Exception { 071 restart(procExecutor, false, true, null, null, null, abort, startWorkers); 072 } 073 074 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, 075 boolean abort) throws Exception { 076 restart(procExecutor, false, true, null, null, null, abort, true); 077 } 078 079 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception { 080 restart(procExecutor, false, true, null, null, null, false, true); 081 } 082 083 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 084 boolean abortOnCorruption) throws IOException { 085 initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true); 086 } 087 088 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 089 boolean abortOnCorruption, boolean startWorkers) throws IOException { 090 procExecutor.init(numThreads, abortOnCorruption); 091 if (startWorkers) { 092 procExecutor.startWorkers(); 093 } 094 } 095 096 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 097 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 098 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception { 099 restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction, 100 actionBeforeStartWorker, startAction, false, true); 101 } 102 103 public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, 104 boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction, 105 Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort, 106 boolean startWorkers) throws Exception { 107 final ProcedureStore procStore = procExecutor.getStore(); 108 final int storeThreads = procExecutor.getCorePoolSize(); 109 final int execThreads = procExecutor.getCorePoolSize(); 110 111 final ProcedureExecutor.Testing testing = procExecutor.testing; 112 if (avoidTestKillDuringRestart) { 113 procExecutor.testing = null; 114 } 115 116 // stop 117 LOG.info("RESTART - Stop"); 118 procExecutor.stop(); 119 procStore.stop(abort); 120 if (stopAction != null) { 121 stopAction.call(); 122 } 123 procExecutor.join(); 124 procExecutor.getScheduler().clear(); 125 126 // nothing running... 127 128 // re-start 129 LOG.info("RESTART - Start"); 130 procStore.start(storeThreads); 131 procExecutor.init(execThreads, failOnCorrupted); 132 if (actionBeforeStartWorker != null) { 133 actionBeforeStartWorker.call(); 134 } 135 if (avoidTestKillDuringRestart) { 136 procExecutor.testing = testing; 137 } 138 if (startWorkers) { 139 procExecutor.startWorkers(); 140 } 141 if (startAction != null) { 142 startAction.call(); 143 } 144 } 145 146 public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader) 147 throws Exception { 148 procStore.stop(false); 149 procStore.start(procStore.getNumThreads()); 150 procStore.recoverLease(); 151 procStore.load(loader); 152 } 153 154 public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId, 155 long runnableCount, int completedCount, int corruptedCount) throws Exception { 156 final LoadCounter loader = new LoadCounter(); 157 storeRestart(procStore, loader); 158 assertEquals(maxProcId, loader.getMaxProcId()); 159 assertEquals(runnableCount, loader.getRunnableCount()); 160 assertEquals(completedCount, loader.getCompletedCount()); 161 assertEquals(corruptedCount, loader.getCorruptedCount()); 162 return loader; 163 } 164 165 private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) { 166 if (procExecutor.testing == null) { 167 procExecutor.testing = new ProcedureExecutor.Testing(); 168 } 169 } 170 171 public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor, 172 boolean value) { 173 createExecutorTesting(procExecutor); 174 procExecutor.testing.killIfHasParent = value; 175 } 176 177 public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor, 178 boolean value) { 179 createExecutorTesting(procExecutor); 180 procExecutor.testing.killIfSuspended = value; 181 } 182 183 public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 184 boolean value) { 185 createExecutorTesting(procExecutor); 186 procExecutor.testing.killBeforeStoreUpdate = value; 187 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 188 assertSingleExecutorForKillTests(procExecutor); 189 } 190 191 public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 192 boolean value) { 193 createExecutorTesting(procExecutor); 194 procExecutor.testing.toggleKillBeforeStoreUpdate = value; 195 assertSingleExecutorForKillTests(procExecutor); 196 } 197 198 public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 199 createExecutorTesting(procExecutor); 200 procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; 201 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 202 assertSingleExecutorForKillTests(procExecutor); 203 } 204 205 public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 206 createExecutorTesting(procExecutor); 207 procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate; 208 LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate); 209 assertSingleExecutorForKillTests(procExecutor); 210 } 211 212 public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 213 boolean value) { 214 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value); 215 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value); 216 assertSingleExecutorForKillTests(procExecutor); 217 } 218 219 private static <TEnv> void assertSingleExecutorForKillTests( 220 final ProcedureExecutor<TEnv> procExecutor) { 221 if (procExecutor.testing == null) { 222 return; 223 } 224 225 if (procExecutor.testing.killBeforeStoreUpdate || 226 procExecutor.testing.toggleKillBeforeStoreUpdate) { 227 assertEquals("expected only one executor running during test with kill/restart", 228 1, procExecutor.getCorePoolSize()); 229 } 230 } 231 232 public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc) 233 throws IOException { 234 NoopProcedureStore procStore = new NoopProcedureStore(); 235 ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore); 236 procStore.start(1); 237 initAndStartWorkers(procExecutor, 1, false, true); 238 try { 239 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 240 } finally { 241 procStore.stop(false); 242 procExecutor.stop(); 243 } 244 } 245 246 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 247 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 248 } 249 250 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 251 final long nonceGroup, final long nonce) { 252 long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce); 253 waitProcedure(procExecutor, procId); 254 return procId; 255 } 256 257 public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 258 final long nonceGroup, final long nonce) { 259 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 260 long procId = procExecutor.registerNonce(nonceKey); 261 assertFalse(procId >= 0); 262 return procExecutor.submitProcedure(proc, nonceKey); 263 } 264 265 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 266 while (proc.getState() == ProcedureState.INITIALIZING) { 267 Threads.sleepWithoutInterrupt(250); 268 } 269 waitProcedure(procExecutor, proc.getProcId()); 270 } 271 272 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) { 273 while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) { 274 Threads.sleepWithoutInterrupt(250); 275 } 276 } 277 278 public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) { 279 for (int i = 0; i < procIds.length; ++i) { 280 waitProcedure(procExecutor, procIds[i]); 281 } 282 } 283 284 public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) { 285 for (long procId : procExecutor.getActiveProcIds()) { 286 waitProcedure(procExecutor, procId); 287 } 288 } 289 290 public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) { 291 int stableRuns = 0; 292 while (stableRuns < 10) { 293 if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) { 294 stableRuns = 0; 295 Threads.sleepWithoutInterrupt(100); 296 } else { 297 stableRuns++; 298 Threads.sleepWithoutInterrupt(25); 299 } 300 } 301 } 302 303 public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor, 304 long procId) { 305 assertFalse("expected a running proc", procExecutor.isFinished(procId)); 306 assertEquals(null, procExecutor.getResult(procId)); 307 } 308 309 public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, 310 long procId) { 311 Procedure<?> result = procExecutor.getResult(procId); 312 assertTrue("expected procedure result", result != null); 313 assertProcNotFailed(result); 314 } 315 316 public static void assertProcNotFailed(final Procedure<?> result) { 317 assertFalse("found exception: " + result.getException(), result.isFailed()); 318 } 319 320 public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor, 321 final long procId) { 322 Procedure<?> result = procExecutor.getResult(procId); 323 assertTrue("expected procedure result", result != null); 324 return assertProcFailed(result); 325 } 326 327 public static Throwable assertProcFailed(final Procedure<?> result) { 328 assertEquals(true, result.isFailed()); 329 LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage()); 330 return getExceptionCause(result); 331 } 332 333 public static void assertIsAbortException(final Procedure<?> result) { 334 Throwable cause = assertProcFailed(result); 335 assertTrue("expected abort exception, got "+ cause, cause instanceof ProcedureAbortedException); 336 } 337 338 public static void assertIsTimeoutException(final Procedure<?> result) { 339 Throwable cause = assertProcFailed(result); 340 assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException); 341 } 342 343 public static void assertIsIllegalArgumentException(final Procedure<?> result) { 344 Throwable cause = assertProcFailed(result); 345 assertTrue("expected IllegalArgumentIOException, got " + cause, 346 cause instanceof IllegalArgumentIOException); 347 } 348 349 public static Throwable getExceptionCause(final Procedure<?> procInfo) { 350 assert procInfo.isFailed(); 351 Throwable cause = procInfo.getException().getCause(); 352 return cause == null ? procInfo.getException() : cause; 353 } 354 355 /** 356 * Run through all procedure flow states TWICE while also restarting 357 * procedure executor at each step; i.e force a reread of procedure store. 358 * 359 *<p>It does 360 * <ol><li>Execute step N - kill the executor before store update 361 * <li>Restart executor/store 362 * <li>Execute step N - and then save to store 363 * </ol> 364 * 365 *<p>This is a good test for finding state that needs persisting and steps that are not 366 * idempotent. 367 */ 368 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 369 final long procId) throws Exception { 370 testRecoveryAndDoubleExecution(procExec, procId, false); 371 } 372 373 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 374 final long procId, final boolean expectFailure) throws Exception { 375 testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null); 376 } 377 378 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 379 final long procId, final boolean expectFailure, final Runnable customRestart) 380 throws Exception { 381 Procedure proc = procExec.getProcedure(procId); 382 waitProcedure(procExec, procId); 383 assertEquals(false, procExec.isRunning()); 384 for (int i = 0; !procExec.isFinished(procId); ++i) { 385 proc = procExec.getProcedure(procId); 386 LOG.info("Restart " + i + " exec state: " + proc); 387 if (customRestart != null) { 388 customRestart.run(); 389 } else { 390 restart(procExec); 391 } 392 waitProcedure(procExec, procId); 393 } 394 395 assertEquals(true, procExec.isRunning()); 396 if (expectFailure) { 397 assertProcFailed(procExec, procId); 398 } else { 399 assertProcNotFailed(procExec, procId); 400 } 401 } 402 403 public static class NoopProcedure<TEnv> extends Procedure<TEnv> { 404 public NoopProcedure() {} 405 406 @Override 407 protected Procedure<TEnv>[] execute(TEnv env) 408 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 409 return null; 410 } 411 412 @Override 413 protected void rollback(TEnv env) throws IOException, InterruptedException { 414 } 415 416 @Override 417 protected boolean abort(TEnv env) { 418 return false; 419 } 420 421 @Override 422 protected void serializeStateData(ProcedureStateSerializer serializer) 423 throws IOException { 424 } 425 426 @Override 427 protected void deserializeStateData(ProcedureStateSerializer serializer) 428 throws IOException { 429 } 430 } 431 432 public static class NoopStateMachineProcedure<TEnv, TState> 433 extends StateMachineProcedure<TEnv, TState> { 434 private TState initialState; 435 private TEnv env; 436 437 public NoopStateMachineProcedure() { 438 } 439 440 public NoopStateMachineProcedure(TEnv env, TState initialState) { 441 this.env = env; 442 this.initialState = initialState; 443 } 444 445 @Override 446 protected Flow executeFromState(TEnv env, TState tState) 447 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 448 return null; 449 } 450 451 @Override 452 protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException { 453 454 } 455 456 @Override 457 protected TState getState(int stateId) { 458 return null; 459 } 460 461 @Override 462 protected int getStateId(TState tState) { 463 return 0; 464 } 465 466 @Override 467 protected TState getInitialState() { 468 return initialState; 469 } 470 } 471 472 public static class TestProcedure extends NoopProcedure<Void> { 473 private byte[] data = null; 474 475 public TestProcedure() {} 476 477 public TestProcedure(long procId) { 478 this(procId, 0); 479 } 480 481 public TestProcedure(long procId, long parentId) { 482 this(procId, parentId, null); 483 } 484 485 public TestProcedure(long procId, long parentId, byte[] data) { 486 this(procId, parentId, parentId, data); 487 } 488 489 public TestProcedure(long procId, long parentId, long rootId, byte[] data) { 490 setData(data); 491 setProcId(procId); 492 if (parentId > 0) { 493 setParentProcId(parentId); 494 } 495 if (rootId > 0 || parentId > 0) { 496 setRootProcId(rootId); 497 } 498 } 499 500 public void addStackId(final int index) { 501 addStackIndex(index); 502 } 503 504 public void setSuccessState() { 505 setState(ProcedureState.SUCCESS); 506 } 507 508 public void setData(final byte[] data) { 509 this.data = data; 510 } 511 512 @Override 513 protected void serializeStateData(ProcedureStateSerializer serializer) 514 throws IOException { 515 ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data); 516 BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString); 517 serializer.serialize(builder.build()); 518 } 519 520 @Override 521 protected void deserializeStateData(ProcedureStateSerializer serializer) 522 throws IOException { 523 BytesValue bytesValue = serializer.deserialize(BytesValue.class); 524 ByteString dataString = bytesValue.getValue(); 525 526 if (dataString.isEmpty()) { 527 data = null; 528 } else { 529 data = dataString.toByteArray(); 530 } 531 } 532 533 // Mark acquire/release lock functions public for test uses. 534 @Override 535 public LockState acquireLock(Void env) { 536 return LockState.LOCK_ACQUIRED; 537 } 538 539 @Override 540 public void releaseLock(Void env) { 541 // no-op 542 } 543 } 544 545 public static class LoadCounter implements ProcedureStore.ProcedureLoader { 546 private final ArrayList<Procedure> corrupted = new ArrayList<>(); 547 private final ArrayList<Procedure> completed = new ArrayList<>(); 548 private final ArrayList<Procedure> runnable = new ArrayList<>(); 549 550 private Set<Long> procIds; 551 private long maxProcId = 0; 552 553 public LoadCounter() { 554 this(null); 555 } 556 557 public LoadCounter(final Set<Long> procIds) { 558 this.procIds = procIds; 559 } 560 561 public void reset() { 562 reset(null); 563 } 564 565 public void reset(final Set<Long> procIds) { 566 corrupted.clear(); 567 completed.clear(); 568 runnable.clear(); 569 this.procIds = procIds; 570 this.maxProcId = 0; 571 } 572 573 public long getMaxProcId() { 574 return maxProcId; 575 } 576 577 public ArrayList<Procedure> getRunnables() { 578 return runnable; 579 } 580 581 public int getRunnableCount() { 582 return runnable.size(); 583 } 584 585 public ArrayList<Procedure> getCompleted() { 586 return completed; 587 } 588 589 public int getCompletedCount() { 590 return completed.size(); 591 } 592 593 public int getLoadedCount() { 594 return runnable.size() + completed.size(); 595 } 596 597 public ArrayList<Procedure> getCorrupted() { 598 return corrupted; 599 } 600 601 public int getCorruptedCount() { 602 return corrupted.size(); 603 } 604 605 public boolean isRunnable(final long procId) { 606 for (Procedure proc: runnable) { 607 if (proc.getProcId() == procId) { 608 return true; 609 } 610 } 611 return false; 612 } 613 614 @Override 615 public void setMaxProcId(long maxProcId) { 616 this.maxProcId = maxProcId; 617 } 618 619 @Override 620 public void load(ProcedureIterator procIter) throws IOException { 621 while (procIter.hasNext()) { 622 long procId; 623 if (procIter.isNextFinished()) { 624 Procedure<?> proc = procIter.next(); 625 procId = proc.getProcId(); 626 LOG.debug("loading completed procId=" + procId + ": " + proc); 627 completed.add(proc); 628 } else { 629 Procedure proc = procIter.next(); 630 procId = proc.getProcId(); 631 LOG.debug("loading runnable procId=" + procId + ": " + proc); 632 runnable.add(proc); 633 } 634 if (procIds != null) { 635 assertTrue("procId=" + procId + " unexpected", procIds.contains(procId)); 636 } 637 } 638 } 639 640 @Override 641 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 642 while (procIter.hasNext()) { 643 Procedure proc = procIter.next(); 644 LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc); 645 corrupted.add(proc); 646 } 647 } 648 } 649}