001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.ArrayList; 027import java.util.Set; 028import java.util.concurrent.Callable; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 035import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 036import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 037import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 038import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 039import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 040import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 041import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 043import org.apache.hadoop.hbase.util.NonceKey; 044import org.apache.hadoop.hbase.util.Threads; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048public class ProcedureTestingUtility { 049 private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class); 050 051 private ProcedureTestingUtility() { 052 } 053 054 public static ProcedureStore createStore(final Configuration conf, final Path dir) 055 throws IOException { 056 return createWalStore(conf, dir); 057 } 058 059 public static WALProcedureStore createWalStore(final Configuration conf, final Path dir) 060 throws IOException { 061 return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() { 062 @Override 063 public void recoverFileLease(FileSystem fs, Path path) throws IOException { 064 // no-op 065 } 066 }); 067 } 068 069 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, 070 boolean abort, boolean startWorkers) throws Exception { 071 restart(procExecutor, false, true, null, null, abort, startWorkers); 072 } 073 074 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, 075 boolean abort) throws Exception { 076 restart(procExecutor, false, true, null, null, abort, true); 077 } 078 079 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception { 080 restart(procExecutor, false, true, null, null, false, true); 081 } 082 083 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 084 boolean abortOnCorruption) throws IOException { 085 initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true); 086 } 087 088 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 089 boolean abortOnCorruption, boolean startWorkers) throws IOException { 090 initAndStartWorkers(procExecutor, numThreads, 0, abortOnCorruption, startWorkers); 091 } 092 093 public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, 094 int numUrgentThreads, boolean abortOnCorruption, boolean startWorkers) throws IOException { 095 procExecutor.init(numThreads, numUrgentThreads, abortOnCorruption); 096 if (startWorkers) { 097 procExecutor.startWorkers(); 098 } 099 } 100 101 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, 102 final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted, 103 final Callable<Void> stopAction, final Callable<Void> startAction) 104 throws Exception { 105 restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, 106 stopAction, startAction, false, true); 107 } 108 109 public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, 110 final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted, 111 final Callable<Void> stopAction, final Callable<Void> startAction, 112 boolean abort, boolean startWorkers) 113 throws Exception { 114 final ProcedureStore procStore = procExecutor.getStore(); 115 final int storeThreads = procExecutor.getCorePoolSize(); 116 final int execThreads = procExecutor.getCorePoolSize(); 117 final int urgentThreads = procExecutor.getUrgentPoolSize(); 118 119 final ProcedureExecutor.Testing testing = procExecutor.testing; 120 if (avoidTestKillDuringRestart) { 121 procExecutor.testing = null; 122 } 123 124 // stop 125 LOG.info("RESTART - Stop"); 126 procExecutor.stop(); 127 procStore.stop(abort); 128 if (stopAction != null) { 129 stopAction.call(); 130 } 131 procExecutor.join(); 132 procExecutor.getScheduler().clear(); 133 134 // nothing running... 135 136 // re-start 137 LOG.info("RESTART - Start"); 138 procStore.start(storeThreads); 139 initAndStartWorkers(procExecutor, execThreads, urgentThreads, failOnCorrupted, startWorkers); 140 if (startAction != null) { 141 startAction.call(); 142 } 143 144 if (avoidTestKillDuringRestart) { 145 procExecutor.testing = testing; 146 } 147 } 148 149 public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader) 150 throws Exception { 151 procStore.stop(false); 152 procStore.start(procStore.getNumThreads()); 153 procStore.recoverLease(); 154 procStore.load(loader); 155 } 156 157 public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId, 158 long runnableCount, int completedCount, int corruptedCount) throws Exception { 159 final LoadCounter loader = new LoadCounter(); 160 storeRestart(procStore, loader); 161 assertEquals(maxProcId, loader.getMaxProcId()); 162 assertEquals(runnableCount, loader.getRunnableCount()); 163 assertEquals(completedCount, loader.getCompletedCount()); 164 assertEquals(corruptedCount, loader.getCorruptedCount()); 165 return loader; 166 } 167 168 private static <TEnv> void createExecutorTesting(final ProcedureExecutor<TEnv> procExecutor) { 169 if (procExecutor.testing == null) { 170 procExecutor.testing = new ProcedureExecutor.Testing(); 171 } 172 } 173 174 public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor, 175 boolean value) { 176 createExecutorTesting(procExecutor); 177 procExecutor.testing.killIfSuspended = value; 178 } 179 180 public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 181 boolean value) { 182 createExecutorTesting(procExecutor); 183 procExecutor.testing.killBeforeStoreUpdate = value; 184 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 185 assertSingleExecutorForKillTests(procExecutor); 186 } 187 188 public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 189 boolean value) { 190 createExecutorTesting(procExecutor); 191 procExecutor.testing.toggleKillBeforeStoreUpdate = value; 192 assertSingleExecutorForKillTests(procExecutor); 193 } 194 195 public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 196 createExecutorTesting(procExecutor); 197 procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate; 198 LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate); 199 assertSingleExecutorForKillTests(procExecutor); 200 } 201 202 public static <TEnv> void toggleKillAfterStoreUpdate(ProcedureExecutor<TEnv> procExecutor) { 203 createExecutorTesting(procExecutor); 204 procExecutor.testing.killAfterStoreUpdate = !procExecutor.testing.killAfterStoreUpdate; 205 LOG.warn("Set Kill after store update to: " + procExecutor.testing.killAfterStoreUpdate); 206 assertSingleExecutorForKillTests(procExecutor); 207 } 208 209 public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor, 210 boolean value) { 211 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value); 212 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value); 213 assertSingleExecutorForKillTests(procExecutor); 214 } 215 216 private static <TEnv> void assertSingleExecutorForKillTests( 217 final ProcedureExecutor<TEnv> procExecutor) { 218 if (procExecutor.testing == null) return; 219 if (procExecutor.testing.killBeforeStoreUpdate || 220 procExecutor.testing.toggleKillBeforeStoreUpdate) { 221 assertEquals("expected only one executor running during test with kill/restart", 222 1, procExecutor.getCorePoolSize()); 223 } 224 } 225 226 public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc) 227 throws IOException { 228 NoopProcedureStore procStore = new NoopProcedureStore(); 229 ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore); 230 procStore.start(1); 231 initAndStartWorkers(procExecutor, 1, false, true); 232 try { 233 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 234 } finally { 235 procStore.stop(false); 236 procExecutor.stop(); 237 } 238 } 239 240 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 241 return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); 242 } 243 244 public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 245 final long nonceGroup, final long nonce) { 246 long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce); 247 waitProcedure(procExecutor, procId); 248 return procId; 249 } 250 251 public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc, 252 final long nonceGroup, final long nonce) { 253 final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce); 254 long procId = procExecutor.registerNonce(nonceKey); 255 assertFalse(procId >= 0); 256 return procExecutor.submitProcedure(proc, nonceKey); 257 } 258 259 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { 260 while (proc.getState() == ProcedureState.INITIALIZING) { 261 Threads.sleepWithoutInterrupt(250); 262 } 263 waitProcedure(procExecutor, proc.getProcId()); 264 } 265 266 public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) { 267 while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) { 268 Threads.sleepWithoutInterrupt(250); 269 } 270 } 271 272 public static <TEnv> void waitProcedures(ProcedureExecutor<TEnv> procExecutor, long... procIds) { 273 for (int i = 0; i < procIds.length; ++i) { 274 waitProcedure(procExecutor, procIds[i]); 275 } 276 } 277 278 public static <TEnv> void waitAllProcedures(ProcedureExecutor<TEnv> procExecutor) { 279 for (long procId : procExecutor.getActiveProcIds()) { 280 waitProcedure(procExecutor, procId); 281 } 282 } 283 284 public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) { 285 int stableRuns = 0; 286 while (stableRuns < 10) { 287 if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getScheduler().size() > 0) { 288 stableRuns = 0; 289 Threads.sleepWithoutInterrupt(100); 290 } else { 291 stableRuns++; 292 Threads.sleepWithoutInterrupt(25); 293 } 294 } 295 } 296 297 public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor, 298 long procId) { 299 assertFalse("expected a running proc", procExecutor.isFinished(procId)); 300 assertEquals(null, procExecutor.getResult(procId)); 301 } 302 303 public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, 304 long procId) { 305 Procedure<?> result = procExecutor.getResult(procId); 306 assertTrue("expected procedure result", result != null); 307 assertProcNotFailed(result); 308 } 309 310 public static void assertProcNotFailed(final Procedure<?> result) { 311 assertFalse("found exception: " + result.getException(), result.isFailed()); 312 } 313 314 public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor, 315 final long procId) { 316 Procedure<?> result = procExecutor.getResult(procId); 317 assertTrue("expected procedure result", result != null); 318 return assertProcFailed(result); 319 } 320 321 public static Throwable assertProcFailed(final Procedure<?> result) { 322 assertEquals(true, result.isFailed()); 323 LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage()); 324 return getExceptionCause(result); 325 } 326 327 public static void assertIsAbortException(final Procedure<?> result) { 328 Throwable cause = assertProcFailed(result); 329 assertTrue("expected abort exception, got "+ cause, cause instanceof ProcedureAbortedException); 330 } 331 332 public static void assertIsTimeoutException(final Procedure<?> result) { 333 Throwable cause = assertProcFailed(result); 334 assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException); 335 } 336 337 public static void assertIsIllegalArgumentException(final Procedure<?> result) { 338 Throwable cause = assertProcFailed(result); 339 assertTrue("expected IllegalArgumentIOException, got " + cause, 340 cause instanceof IllegalArgumentIOException); 341 } 342 343 public static Throwable getExceptionCause(final Procedure<?> procInfo) { 344 assert procInfo.isFailed(); 345 Throwable cause = procInfo.getException().getCause(); 346 return cause == null ? procInfo.getException() : cause; 347 } 348 349 /** 350 * Run through all procedure flow states TWICE while also restarting 351 * procedure executor at each step; i.e force a reread of procedure store. 352 * 353 *<p>It does 354 * <ol><li>Execute step N - kill the executor before store update 355 * <li>Restart executor/store 356 * <li>Execute step N - and then save to store 357 * </ol> 358 * 359 *<p>This is a good test for finding state that needs persisting and steps that are not 360 * idempotent. 361 */ 362 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 363 final long procId) throws Exception { 364 testRecoveryAndDoubleExecution(procExec, procId, false); 365 } 366 367 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 368 final long procId, final boolean expectFailure) throws Exception { 369 testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null); 370 } 371 372 public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec, 373 final long procId, final boolean expectFailure, final Runnable customRestart) 374 throws Exception { 375 Procedure proc = procExec.getProcedure(procId); 376 waitProcedure(procExec, procId); 377 assertEquals(false, procExec.isRunning()); 378 for (int i = 0; !procExec.isFinished(procId); ++i) { 379 proc = procExec.getProcedure(procId); 380 LOG.info("Restart " + i + " exec state: " + proc); 381 if (customRestart != null) { 382 customRestart.run(); 383 } else { 384 restart(procExec); 385 } 386 waitProcedure(procExec, procId); 387 } 388 389 assertEquals(true, procExec.isRunning()); 390 if (expectFailure) { 391 assertProcFailed(procExec, procId); 392 } else { 393 assertProcNotFailed(procExec, procId); 394 } 395 } 396 397 public static class NoopProcedure<TEnv> extends Procedure<TEnv> { 398 public NoopProcedure() {} 399 400 @Override 401 protected Procedure<TEnv>[] execute(TEnv env) 402 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 403 return null; 404 } 405 406 @Override 407 protected void rollback(TEnv env) throws IOException, InterruptedException { 408 } 409 410 @Override 411 protected boolean abort(TEnv env) { return false; } 412 413 @Override 414 protected void serializeStateData(ProcedureStateSerializer serializer) 415 throws IOException { 416 } 417 418 @Override 419 protected void deserializeStateData(ProcedureStateSerializer serializer) 420 throws IOException { 421 } 422 } 423 424 public static class NoopStateMachineProcedure<TEnv, TState> 425 extends StateMachineProcedure<TEnv, TState> { 426 private TState initialState; 427 private TEnv env; 428 429 public NoopStateMachineProcedure() { 430 } 431 432 public NoopStateMachineProcedure(TEnv env, TState initialState) { 433 this.env = env; 434 this.initialState = initialState; 435 } 436 437 @Override 438 protected Flow executeFromState(TEnv env, TState tState) 439 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 440 return null; 441 } 442 443 @Override 444 protected void rollbackState(TEnv env, TState tState) throws IOException, InterruptedException { 445 446 } 447 448 @Override 449 protected TState getState(int stateId) { 450 return null; 451 } 452 453 @Override 454 protected int getStateId(TState tState) { 455 return 0; 456 } 457 458 @Override 459 protected TState getInitialState() { 460 return initialState; 461 } 462 } 463 464 public static class TestProcedure extends NoopProcedure<Void> { 465 private byte[] data = null; 466 467 public TestProcedure() {} 468 469 public TestProcedure(long procId) { 470 this(procId, 0); 471 } 472 473 public TestProcedure(long procId, long parentId) { 474 this(procId, parentId, null); 475 } 476 477 public TestProcedure(long procId, long parentId, byte[] data) { 478 this(procId, parentId, parentId, data); 479 } 480 481 public TestProcedure(long procId, long parentId, long rootId, byte[] data) { 482 setData(data); 483 setProcId(procId); 484 if (parentId > 0) { 485 setParentProcId(parentId); 486 } 487 if (rootId > 0 || parentId > 0) { 488 setRootProcId(rootId); 489 } 490 } 491 492 public void addStackId(final int index) { 493 addStackIndex(index); 494 } 495 496 public void setSuccessState() { 497 setState(ProcedureState.SUCCESS); 498 } 499 500 public void setData(final byte[] data) { 501 this.data = data; 502 } 503 504 @Override 505 protected void serializeStateData(ProcedureStateSerializer serializer) 506 throws IOException { 507 ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data); 508 BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString); 509 serializer.serialize(builder.build()); 510 } 511 512 @Override 513 protected void deserializeStateData(ProcedureStateSerializer serializer) 514 throws IOException { 515 BytesValue bytesValue = serializer.deserialize(BytesValue.class); 516 ByteString dataString = bytesValue.getValue(); 517 518 if (dataString.isEmpty()) { 519 data = null; 520 } else { 521 data = dataString.toByteArray(); 522 } 523 } 524 525 // Mark acquire/release lock functions public for test uses. 526 @Override 527 public LockState acquireLock(Void env) { 528 return LockState.LOCK_ACQUIRED; 529 } 530 531 @Override 532 public void releaseLock(Void env) { 533 // no-op 534 } 535 } 536 537 public static class LoadCounter implements ProcedureStore.ProcedureLoader { 538 private final ArrayList<Procedure> corrupted = new ArrayList<>(); 539 private final ArrayList<Procedure> completed = new ArrayList<>(); 540 private final ArrayList<Procedure> runnable = new ArrayList<>(); 541 542 private Set<Long> procIds; 543 private long maxProcId = 0; 544 545 public LoadCounter() { 546 this(null); 547 } 548 549 public LoadCounter(final Set<Long> procIds) { 550 this.procIds = procIds; 551 } 552 553 public void reset() { 554 reset(null); 555 } 556 557 public void reset(final Set<Long> procIds) { 558 corrupted.clear(); 559 completed.clear(); 560 runnable.clear(); 561 this.procIds = procIds; 562 this.maxProcId = 0; 563 } 564 565 public long getMaxProcId() { 566 return maxProcId; 567 } 568 569 public ArrayList<Procedure> getRunnables() { 570 return runnable; 571 } 572 573 public int getRunnableCount() { 574 return runnable.size(); 575 } 576 577 public ArrayList<Procedure> getCompleted() { 578 return completed; 579 } 580 581 public int getCompletedCount() { 582 return completed.size(); 583 } 584 585 public int getLoadedCount() { 586 return runnable.size() + completed.size(); 587 } 588 589 public ArrayList<Procedure> getCorrupted() { 590 return corrupted; 591 } 592 593 public int getCorruptedCount() { 594 return corrupted.size(); 595 } 596 597 public boolean isRunnable(final long procId) { 598 for (Procedure proc: runnable) { 599 if (proc.getProcId() == procId) { 600 return true; 601 } 602 } 603 return false; 604 } 605 606 @Override 607 public void setMaxProcId(long maxProcId) { 608 this.maxProcId = maxProcId; 609 } 610 611 @Override 612 public void load(ProcedureIterator procIter) throws IOException { 613 while (procIter.hasNext()) { 614 long procId; 615 if (procIter.isNextFinished()) { 616 Procedure<?> proc = procIter.next(); 617 procId = proc.getProcId(); 618 LOG.debug("loading completed procId=" + procId + ": " + proc); 619 completed.add(proc); 620 } else { 621 Procedure proc = procIter.next(); 622 procId = proc.getProcId(); 623 LOG.debug("loading runnable procId=" + procId + ": " + proc); 624 runnable.add(proc); 625 } 626 if (procIds != null) { 627 assertTrue("procId=" + procId + " unexpected", procIds.contains(procId)); 628 } 629 } 630 } 631 632 @Override 633 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 634 while (procIter.hasNext()) { 635 Procedure proc = procIter.next(); 636 LOG.debug("corrupted procId=" + proc.getProcId() + ": " + proc); 637 corrupted.add(proc); 638 } 639 } 640 } 641}