001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.hbase.util.Threads; 037import org.junit.jupiter.api.AfterEach; 038import org.junit.jupiter.api.BeforeEach; 039import org.junit.jupiter.api.Tag; 040import org.junit.jupiter.api.Test; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 045 046@Tag(MasterTests.TAG) 047@Tag(MediumTests.TAG) 048public class TestProcedureRecovery { 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class); 051 052 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 053 054 private static TestProcEnv procEnv; 055 private static ProcedureExecutor<TestProcEnv> procExecutor; 056 private static ProcedureStore procStore; 057 private static int procSleepInterval; 058 059 private HBaseCommonTestingUtil htu; 060 private FileSystem fs; 061 private Path testDir; 062 private Path logDir; 063 064 @BeforeEach 065 public void setUp() throws IOException { 066 htu = new HBaseCommonTestingUtil(); 067 testDir = htu.getDataTestDir(); 068 fs = testDir.getFileSystem(htu.getConfiguration()); 069 assertTrue(testDir.depth() > 1); 070 071 logDir = new Path(testDir, "proc-logs"); 072 procEnv = new TestProcEnv(); 073 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 074 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 075 procExecutor.testing = new ProcedureExecutor.Testing(); 076 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 077 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 078 procSleepInterval = 0; 079 } 080 081 @AfterEach 082 public void tearDown() throws IOException { 083 procExecutor.stop(); 084 procStore.stop(false); 085 fs.delete(logDir, true); 086 } 087 088 private void restart() throws Exception { 089 dumpLogDirState(); 090 ProcedureTestingUtility.restart(procExecutor); 091 dumpLogDirState(); 092 } 093 094 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 095 private int step = 0; 096 097 public TestSingleStepProcedure() { 098 } 099 100 @Override 101 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 102 env.waitOnLatch(); 103 LOG.debug("execute procedure " + this + " step=" + step); 104 step++; 105 setResult(Bytes.toBytes(step)); 106 return null; 107 } 108 109 @Override 110 protected void rollback(TestProcEnv env) { 111 } 112 113 @Override 114 protected boolean abort(TestProcEnv env) { 115 return true; 116 } 117 } 118 119 public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> { 120 private AtomicBoolean abort = new AtomicBoolean(false); 121 private int step = 0; 122 123 @Override 124 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 125 env.waitOnLatch(); 126 LOG.debug("execute procedure " + this + " step=" + step); 127 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 128 ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor); 129 step++; 130 Threads.sleepWithoutInterrupt(procSleepInterval); 131 if (isAborted()) { 132 setFailure(new RemoteProcedureException(getClass().getName(), new ProcedureAbortedException( 133 "got an abort at " + getClass().getName() + " step=" + step))); 134 return null; 135 } 136 return null; 137 } 138 139 @Override 140 protected void rollback(TestProcEnv env) { 141 LOG.debug("rollback procedure " + this + " step=" + step); 142 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 143 ProcedureTestingUtility.toggleKillBeforeStoreUpdateInRollback(procExecutor); 144 step++; 145 } 146 147 @Override 148 protected boolean abort(TestProcEnv env) { 149 abort.set(true); 150 return true; 151 } 152 153 private boolean isAborted() { 154 boolean aborted = abort.get(); 155 BaseTestStepProcedure proc = this; 156 while (proc.hasParent() && !aborted) { 157 proc = (BaseTestStepProcedure) procExecutor.getProcedure(proc.getParentProcId()); 158 aborted = proc.isAborted(); 159 } 160 return aborted; 161 } 162 } 163 164 public static class TestMultiStepProcedure extends BaseTestStepProcedure { 165 public TestMultiStepProcedure() { 166 } 167 168 @Override 169 public Procedure[] execute(TestProcEnv env) throws InterruptedException { 170 super.execute(env); 171 return isFailed() ? null : new Procedure[] { new Step1Procedure() }; 172 } 173 174 public static class Step1Procedure extends BaseTestStepProcedure { 175 public Step1Procedure() { 176 } 177 178 @Override 179 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 180 super.execute(env); 181 return isFailed() ? null : new Procedure[] { new Step2Procedure() }; 182 } 183 } 184 185 public static class Step2Procedure extends BaseTestStepProcedure { 186 public Step2Procedure() { 187 } 188 } 189 } 190 191 @Test 192 public void testNoopLoad() throws Exception { 193 restart(); 194 } 195 196 @Test 197 public void testSingleStepProcRecovery() throws Exception { 198 Procedure proc = new TestSingleStepProcedure(); 199 procExecutor.testing.killBeforeStoreUpdate = true; 200 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 201 assertFalse(procExecutor.isRunning()); 202 procExecutor.testing.killBeforeStoreUpdate = false; 203 204 // Restart and verify that the procedures restart 205 long restartTs = EnvironmentEdgeManager.currentTime(); 206 restart(); 207 waitProcedure(procId); 208 Procedure<?> result = procExecutor.getResult(procId); 209 assertTrue(result.getLastUpdate() > restartTs); 210 ProcedureTestingUtility.assertProcNotFailed(result); 211 assertEquals(1, Bytes.toInt(result.getResult())); 212 long resultTs = result.getLastUpdate(); 213 214 // Verify that after another restart the result is still there 215 restart(); 216 result = procExecutor.getResult(procId); 217 ProcedureTestingUtility.assertProcNotFailed(result); 218 assertEquals(resultTs, result.getLastUpdate()); 219 assertEquals(1, Bytes.toInt(result.getResult())); 220 } 221 222 @Test 223 public void testMultiStepProcRecovery() throws Exception { 224 // Step 0 - kill 225 Procedure proc = new TestMultiStepProcedure(); 226 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 227 assertFalse(procExecutor.isRunning()); 228 229 // Step 0 exec && Step 1 - kill 230 restart(); 231 waitProcedure(procId); 232 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 233 assertFalse(procExecutor.isRunning()); 234 235 // Step 1 exec && step 2 - kill 236 restart(); 237 waitProcedure(procId); 238 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 239 assertFalse(procExecutor.isRunning()); 240 241 // Step 2 exec 242 restart(); 243 waitProcedure(procId); 244 assertTrue(procExecutor.isRunning()); 245 246 // The procedure is completed 247 Procedure<?> result = procExecutor.getResult(procId); 248 ProcedureTestingUtility.assertProcNotFailed(result); 249 } 250 251 @Test 252 public void testMultiStepRollbackRecovery() throws Exception { 253 // Step 0 - kill 254 Procedure proc = new TestMultiStepProcedure(); 255 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 256 assertFalse(procExecutor.isRunning()); 257 258 // Step 0 exec && Step 1 - kill 259 restart(); 260 waitProcedure(procId); 261 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 262 assertFalse(procExecutor.isRunning()); 263 264 // Step 1 exec && step 2 - kill 265 restart(); 266 waitProcedure(procId); 267 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 268 assertFalse(procExecutor.isRunning()); 269 270 // Step 2 exec - rollback - kill 271 procSleepInterval = 2500; 272 restart(); 273 assertTrue(procExecutor.abort(procId)); 274 waitProcedure(procId); 275 assertFalse(procExecutor.isRunning()); 276 277 // rollback - kill 278 restart(); 279 waitProcedure(procId); 280 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 281 assertFalse(procExecutor.isRunning()); 282 283 // rollback - complete 284 restart(); 285 waitProcedure(procId); 286 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 287 assertFalse(procExecutor.isRunning()); 288 289 // Restart the executor and get the result 290 restart(); 291 waitProcedure(procId); 292 293 // The procedure is completed 294 Procedure<?> result = procExecutor.getResult(procId); 295 ProcedureTestingUtility.assertIsAbortException(result); 296 } 297 298 public static class TestStateMachineProcedure 299 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 300 enum State { 301 STATE_1, 302 STATE_2, 303 STATE_3, 304 DONE 305 } 306 307 public TestStateMachineProcedure() { 308 } 309 310 public TestStateMachineProcedure(final boolean testSubmitChildProc) { 311 this.submitChildProc = testSubmitChildProc; 312 } 313 314 private AtomicBoolean aborted = new AtomicBoolean(false); 315 private int iResult = 0; 316 private boolean submitChildProc = false; 317 318 @Override 319 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { 320 switch (state) { 321 case STATE_1: 322 LOG.info("execute step 1 " + this); 323 setNextState(State.STATE_2); 324 iResult += 3; 325 break; 326 case STATE_2: 327 LOG.info("execute step 2 " + this); 328 if (submitChildProc) { 329 addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure()); 330 setNextState(State.DONE); 331 } else { 332 setNextState(State.STATE_3); 333 } 334 iResult += 5; 335 break; 336 case STATE_3: 337 LOG.info("execute step 3 " + this); 338 Threads.sleepWithoutInterrupt(procSleepInterval); 339 if (aborted.get()) { 340 LOG.info("aborted step 3 " + this); 341 setAbortFailure("test", "aborted"); 342 break; 343 } 344 setNextState(State.DONE); 345 iResult += 7; 346 break; 347 case DONE: 348 if (submitChildProc) { 349 addChildProcedure(new TestStateMachineProcedure()); 350 } 351 iResult += 11; 352 setResult(Bytes.toBytes(iResult)); 353 return Flow.NO_MORE_STATE; 354 default: 355 throw new UnsupportedOperationException(); 356 } 357 return Flow.HAS_MORE_STATE; 358 } 359 360 @Override 361 protected boolean isRollbackSupported(State state) { 362 return true; 363 } 364 365 @Override 366 protected void rollbackState(TestProcEnv env, final State state) { 367 switch (state) { 368 case STATE_1: 369 LOG.info("rollback step 1 " + this); 370 break; 371 case STATE_2: 372 LOG.info("rollback step 2 " + this); 373 break; 374 case STATE_3: 375 LOG.info("rollback step 3 " + this); 376 break; 377 default: 378 throw new UnsupportedOperationException(); 379 } 380 } 381 382 @Override 383 protected State getState(final int stateId) { 384 return State.values()[stateId]; 385 } 386 387 @Override 388 protected int getStateId(final State state) { 389 return state.ordinal(); 390 } 391 392 @Override 393 protected State getInitialState() { 394 return State.STATE_1; 395 } 396 397 @Override 398 protected boolean abort(TestProcEnv env) { 399 aborted.set(true); 400 return true; 401 } 402 403 @Override 404 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 405 super.serializeStateData(serializer); 406 Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult); 407 serializer.serialize(builder.build()); 408 } 409 410 @Override 411 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 412 super.deserializeStateData(serializer); 413 Int32Value value = serializer.deserialize(Int32Value.class); 414 iResult = value.getValue(); 415 } 416 } 417 418 @Test 419 public void testStateMachineMultipleLevel() throws Exception { 420 long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); 421 // Wait the completion 422 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 423 Procedure<?> result = procExecutor.getResult(procId); 424 ProcedureTestingUtility.assertProcNotFailed(result); 425 assertEquals(19, Bytes.toInt(result.getResult())); 426 assertEquals(4, procExecutor.getLastProcId()); 427 } 428 429 @Test 430 public void testStateMachineRecovery() throws Exception { 431 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 432 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); 433 434 // Step 1 - kill 435 Procedure proc = new TestStateMachineProcedure(); 436 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 437 assertFalse(procExecutor.isRunning()); 438 439 // Step 1 exec && Step 2 - kill 440 restart(); 441 waitProcedure(procId); 442 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 443 assertFalse(procExecutor.isRunning()); 444 445 // Step 2 exec && step 3 - kill 446 restart(); 447 waitProcedure(procId); 448 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 449 assertFalse(procExecutor.isRunning()); 450 451 // Step 3 exec 452 restart(); 453 waitProcedure(procId); 454 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 455 assertFalse(procExecutor.isRunning()); 456 457 restart(); 458 waitProcedure(procId); 459 assertTrue(procExecutor.isRunning()); 460 461 // The procedure is completed 462 Procedure<?> result = procExecutor.getResult(procId); 463 ProcedureTestingUtility.assertProcNotFailed(result); 464 assertEquals(26, Bytes.toInt(result.getResult())); 465 } 466 467 @Test 468 public void testStateMachineRollbackRecovery() throws Exception { 469 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true); 470 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdateInRollback(procExecutor, true); 471 472 // Step 1 - kill 473 Procedure proc = new TestStateMachineProcedure(); 474 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 475 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 476 assertFalse(procExecutor.isRunning()); 477 478 // Step 1 exec && Step 2 - kill 479 restart(); 480 waitProcedure(procId); 481 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 482 assertFalse(procExecutor.isRunning()); 483 484 // Step 2 exec && step 3 - kill 485 restart(); 486 waitProcedure(procId); 487 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 488 assertFalse(procExecutor.isRunning()); 489 490 // Step 3 exec - rollback step 3 - kill 491 procSleepInterval = 2500; 492 restart(); 493 assertTrue(procExecutor.abort(procId)); 494 waitProcedure(procId); 495 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 496 assertFalse(procExecutor.isRunning()); 497 498 // Rollback step 3 - rollback step 2 - kill 499 restart(); 500 waitProcedure(procId); 501 assertFalse(procExecutor.isRunning()); 502 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 503 504 // Rollback step 2 - step 1 - kill 505 restart(); 506 waitProcedure(procId); 507 assertFalse(procExecutor.isRunning()); 508 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 509 510 // Rollback step 1 - complete 511 restart(); 512 waitProcedure(procId); 513 assertTrue(procExecutor.isRunning()); 514 515 // The procedure is completed 516 Procedure<?> result = procExecutor.getResult(procId); 517 ProcedureTestingUtility.assertIsAbortException(result); 518 } 519 520 private void waitProcedure(final long procId) { 521 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 522 dumpLogDirState(); 523 } 524 525 private void dumpLogDirState() { 526 try { 527 FileStatus[] files = fs.listStatus(logDir); 528 if (files != null && files.length > 0) { 529 for (FileStatus file : files) { 530 assertTrue(file.isFile(), file.toString()); 531 LOG.debug("log file " + file.getPath() + " size=" + file.getLen()); 532 } 533 } else { 534 LOG.debug("no files under: " + logDir); 535 } 536 } catch (IOException e) { 537 LOG.warn("Unable to dump " + logDir, e); 538 } 539 } 540 541 private static class TestProcEnv { 542 private CountDownLatch latch = null; 543 544 /** 545 * set/unset a latch. every procedure execute() step will wait on the latch if any. 546 */ 547 public void setWaitLatch(CountDownLatch latch) { 548 this.latch = latch; 549 } 550 551 public void waitOnLatch() throws InterruptedException { 552 if (latch != null) { 553 latch.await(); 554 } 555 } 556 } 557}