001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 047 048@Category({ MasterTests.class, MediumTests.class }) 049public class TestProcedureRecovery { 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestProcedureRecovery.class); 053 054 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class); 055 056 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 057 058 private static TestProcEnv procEnv; 059 private static ProcedureExecutor<TestProcEnv> procExecutor; 060 private static ProcedureStore procStore; 061 private static int procSleepInterval; 062 063 private HBaseCommonTestingUtility htu; 064 private FileSystem fs; 065 private Path testDir; 066 private Path logDir; 067 068 @Before 069 public void setUp() throws IOException { 070 htu = new HBaseCommonTestingUtility(); 071 testDir = htu.getDataTestDir(); 072 fs = testDir.getFileSystem(htu.getConfiguration()); 073 assertTrue(testDir.depth() > 1); 074 075 logDir = new Path(testDir, "proc-logs"); 076 procEnv = new TestProcEnv(); 077 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 078 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 079 procExecutor.testing = new ProcedureExecutor.Testing(); 080 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 081 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 082 procSleepInterval = 0; 083 } 084 085 @After 086 public void tearDown() throws IOException { 087 procExecutor.stop(); 088 procStore.stop(false); 089 fs.delete(logDir, true); 090 } 091 092 private void restart() throws Exception { 093 dumpLogDirState(); 094 ProcedureTestingUtility.restart(procExecutor); 095 dumpLogDirState(); 096 } 097 098 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 099 private int step = 0; 100 101 public TestSingleStepProcedure() { 102 } 103 104 @Override 105 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 106 env.waitOnLatch(); 107 LOG.debug("execute procedure " + this + " step=" + step); 108 step++; 109 setResult(Bytes.toBytes(step)); 110 return null; 111 } 112 113 @Override 114 protected void rollback(TestProcEnv env) { 115 } 116 117 @Override 118 protected boolean abort(TestProcEnv env) { 119 return true; 120 } 121 } 122 123 public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> { 124 private AtomicBoolean abort = new AtomicBoolean(false); 125 private int step = 0; 126 127 @Override 128 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 129 env.waitOnLatch(); 130 LOG.debug("execute procedure " + this + " step=" + step); 131 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 132 step++; 133 Threads.sleepWithoutInterrupt(procSleepInterval); 134 if (isAborted()) { 135 setFailure(new RemoteProcedureException(getClass().getName(), new ProcedureAbortedException( 136 "got an abort at " + getClass().getName() + " step=" + step))); 137 return null; 138 } 139 return null; 140 } 141 142 @Override 143 protected void rollback(TestProcEnv env) { 144 LOG.debug("rollback procedure " + this + " step=" + step); 145 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 146 step++; 147 } 148 149 @Override 150 protected boolean abort(TestProcEnv env) { 151 abort.set(true); 152 return true; 153 } 154 155 private boolean isAborted() { 156 boolean aborted = abort.get(); 157 BaseTestStepProcedure proc = this; 158 while (proc.hasParent() && !aborted) { 159 proc = (BaseTestStepProcedure) procExecutor.getProcedure(proc.getParentProcId()); 160 aborted = proc.isAborted(); 161 } 162 return aborted; 163 } 164 } 165 166 public static class TestMultiStepProcedure extends BaseTestStepProcedure { 167 public TestMultiStepProcedure() { 168 } 169 170 @Override 171 public Procedure[] execute(TestProcEnv env) throws InterruptedException { 172 super.execute(env); 173 return isFailed() ? null : new Procedure[] { new Step1Procedure() }; 174 } 175 176 public static class Step1Procedure extends BaseTestStepProcedure { 177 public Step1Procedure() { 178 } 179 180 @Override 181 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 182 super.execute(env); 183 return isFailed() ? null : new Procedure[] { new Step2Procedure() }; 184 } 185 } 186 187 public static class Step2Procedure extends BaseTestStepProcedure { 188 public Step2Procedure() { 189 } 190 } 191 } 192 193 @Test 194 public void testNoopLoad() throws Exception { 195 restart(); 196 } 197 198 @Test 199 public void testSingleStepProcRecovery() throws Exception { 200 Procedure proc = new TestSingleStepProcedure(); 201 procExecutor.testing.killBeforeStoreUpdate = true; 202 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 203 assertFalse(procExecutor.isRunning()); 204 procExecutor.testing.killBeforeStoreUpdate = false; 205 206 // Restart and verify that the procedures restart 207 long restartTs = EnvironmentEdgeManager.currentTime(); 208 restart(); 209 waitProcedure(procId); 210 Procedure<?> result = procExecutor.getResult(procId); 211 assertTrue(result.getLastUpdate() > restartTs); 212 ProcedureTestingUtility.assertProcNotFailed(result); 213 assertEquals(1, Bytes.toInt(result.getResult())); 214 long resultTs = result.getLastUpdate(); 215 216 // Verify that after another restart the result is still there 217 restart(); 218 result = procExecutor.getResult(procId); 219 ProcedureTestingUtility.assertProcNotFailed(result); 220 assertEquals(resultTs, result.getLastUpdate()); 221 assertEquals(1, Bytes.toInt(result.getResult())); 222 } 223 224 @Test 225 public void testMultiStepProcRecovery() throws Exception { 226 // Step 0 - kill 227 Procedure proc = new TestMultiStepProcedure(); 228 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 229 assertFalse(procExecutor.isRunning()); 230 231 // Step 0 exec && Step 1 - kill 232 restart(); 233 waitProcedure(procId); 234 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 235 assertFalse(procExecutor.isRunning()); 236 237 // Step 1 exec && step 2 - kill 238 restart(); 239 waitProcedure(procId); 240 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 241 assertFalse(procExecutor.isRunning()); 242 243 // Step 2 exec 244 restart(); 245 waitProcedure(procId); 246 assertTrue(procExecutor.isRunning()); 247 248 // The procedure is completed 249 Procedure<?> result = procExecutor.getResult(procId); 250 ProcedureTestingUtility.assertProcNotFailed(result); 251 } 252 253 @Test 254 public void testMultiStepRollbackRecovery() throws Exception { 255 // Step 0 - kill 256 Procedure proc = new TestMultiStepProcedure(); 257 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 258 assertFalse(procExecutor.isRunning()); 259 260 // Step 0 exec && Step 1 - kill 261 restart(); 262 waitProcedure(procId); 263 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 264 assertFalse(procExecutor.isRunning()); 265 266 // Step 1 exec && step 2 - kill 267 restart(); 268 waitProcedure(procId); 269 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 270 assertFalse(procExecutor.isRunning()); 271 272 // Step 2 exec - rollback - kill 273 procSleepInterval = 2500; 274 restart(); 275 assertTrue(procExecutor.abort(procId)); 276 waitProcedure(procId); 277 assertFalse(procExecutor.isRunning()); 278 279 // rollback - kill 280 restart(); 281 waitProcedure(procId); 282 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 283 assertFalse(procExecutor.isRunning()); 284 285 // rollback - complete 286 restart(); 287 waitProcedure(procId); 288 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 289 assertFalse(procExecutor.isRunning()); 290 291 // Restart the executor and get the result 292 restart(); 293 waitProcedure(procId); 294 295 // The procedure is completed 296 Procedure<?> result = procExecutor.getResult(procId); 297 ProcedureTestingUtility.assertIsAbortException(result); 298 } 299 300 public static class TestStateMachineProcedure 301 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 302 enum State { 303 STATE_1, 304 STATE_2, 305 STATE_3, 306 DONE 307 } 308 309 public TestStateMachineProcedure() { 310 } 311 312 public TestStateMachineProcedure(final boolean testSubmitChildProc) { 313 this.submitChildProc = testSubmitChildProc; 314 } 315 316 private AtomicBoolean aborted = new AtomicBoolean(false); 317 private int iResult = 0; 318 private boolean submitChildProc = false; 319 320 @Override 321 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { 322 switch (state) { 323 case STATE_1: 324 LOG.info("execute step 1 " + this); 325 setNextState(State.STATE_2); 326 iResult += 3; 327 break; 328 case STATE_2: 329 LOG.info("execute step 2 " + this); 330 if (submitChildProc) { 331 addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure()); 332 setNextState(State.DONE); 333 } else { 334 setNextState(State.STATE_3); 335 } 336 iResult += 5; 337 break; 338 case STATE_3: 339 LOG.info("execute step 3 " + this); 340 Threads.sleepWithoutInterrupt(procSleepInterval); 341 if (aborted.get()) { 342 LOG.info("aborted step 3 " + this); 343 setAbortFailure("test", "aborted"); 344 break; 345 } 346 setNextState(State.DONE); 347 iResult += 7; 348 break; 349 case DONE: 350 if (submitChildProc) { 351 addChildProcedure(new TestStateMachineProcedure()); 352 } 353 iResult += 11; 354 setResult(Bytes.toBytes(iResult)); 355 return Flow.NO_MORE_STATE; 356 default: 357 throw new UnsupportedOperationException(); 358 } 359 return Flow.HAS_MORE_STATE; 360 } 361 362 @Override 363 protected void rollbackState(TestProcEnv env, final State state) { 364 switch (state) { 365 case STATE_1: 366 LOG.info("rollback step 1 " + this); 367 break; 368 case STATE_2: 369 LOG.info("rollback step 2 " + this); 370 break; 371 case STATE_3: 372 LOG.info("rollback step 3 " + this); 373 break; 374 default: 375 throw new UnsupportedOperationException(); 376 } 377 } 378 379 @Override 380 protected State getState(final int stateId) { 381 return State.values()[stateId]; 382 } 383 384 @Override 385 protected int getStateId(final State state) { 386 return state.ordinal(); 387 } 388 389 @Override 390 protected State getInitialState() { 391 return State.STATE_1; 392 } 393 394 @Override 395 protected boolean abort(TestProcEnv env) { 396 aborted.set(true); 397 return true; 398 } 399 400 @Override 401 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 402 super.serializeStateData(serializer); 403 Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult); 404 serializer.serialize(builder.build()); 405 } 406 407 @Override 408 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 409 super.deserializeStateData(serializer); 410 Int32Value value = serializer.deserialize(Int32Value.class); 411 iResult = value.getValue(); 412 } 413 } 414 415 @Test 416 public void testStateMachineMultipleLevel() throws Exception { 417 long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); 418 // Wait the completion 419 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 420 Procedure<?> result = procExecutor.getResult(procId); 421 ProcedureTestingUtility.assertProcNotFailed(result); 422 assertEquals(19, Bytes.toInt(result.getResult())); 423 assertEquals(4, procExecutor.getLastProcId()); 424 } 425 426 @Test 427 public void testStateMachineRecovery() throws Exception { 428 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); 429 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); 430 431 // Step 1 - kill 432 Procedure proc = new TestStateMachineProcedure(); 433 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 434 assertFalse(procExecutor.isRunning()); 435 436 // Step 1 exec && Step 2 - kill 437 restart(); 438 waitProcedure(procId); 439 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 440 assertFalse(procExecutor.isRunning()); 441 442 // Step 2 exec && step 3 - kill 443 restart(); 444 waitProcedure(procId); 445 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 446 assertFalse(procExecutor.isRunning()); 447 448 // Step 3 exec 449 restart(); 450 waitProcedure(procId); 451 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 452 assertFalse(procExecutor.isRunning()); 453 454 restart(); 455 waitProcedure(procId); 456 assertTrue(procExecutor.isRunning()); 457 458 // The procedure is completed 459 Procedure<?> result = procExecutor.getResult(procId); 460 ProcedureTestingUtility.assertProcNotFailed(result); 461 assertEquals(26, Bytes.toInt(result.getResult())); 462 } 463 464 @Test 465 public void testStateMachineRollbackRecovery() throws Exception { 466 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); 467 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); 468 469 // Step 1 - kill 470 Procedure proc = new TestStateMachineProcedure(); 471 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 472 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 473 assertFalse(procExecutor.isRunning()); 474 475 // Step 1 exec && Step 2 - kill 476 restart(); 477 waitProcedure(procId); 478 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 479 assertFalse(procExecutor.isRunning()); 480 481 // Step 2 exec && step 3 - kill 482 restart(); 483 waitProcedure(procId); 484 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 485 assertFalse(procExecutor.isRunning()); 486 487 // Step 3 exec - rollback step 3 - kill 488 procSleepInterval = 2500; 489 restart(); 490 assertTrue(procExecutor.abort(procId)); 491 waitProcedure(procId); 492 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 493 assertFalse(procExecutor.isRunning()); 494 495 // Rollback step 3 - rollback step 2 - kill 496 restart(); 497 waitProcedure(procId); 498 assertFalse(procExecutor.isRunning()); 499 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 500 501 // Rollback step 2 - step 1 - kill 502 restart(); 503 waitProcedure(procId); 504 assertFalse(procExecutor.isRunning()); 505 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 506 507 // Rollback step 1 - complete 508 restart(); 509 waitProcedure(procId); 510 assertTrue(procExecutor.isRunning()); 511 512 // The procedure is completed 513 Procedure<?> result = procExecutor.getResult(procId); 514 ProcedureTestingUtility.assertIsAbortException(result); 515 } 516 517 private void waitProcedure(final long procId) { 518 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 519 dumpLogDirState(); 520 } 521 522 private void dumpLogDirState() { 523 try { 524 FileStatus[] files = fs.listStatus(logDir); 525 if (files != null && files.length > 0) { 526 for (FileStatus file : files) { 527 assertTrue(file.toString(), file.isFile()); 528 LOG.debug("log file " + file.getPath() + " size=" + file.getLen()); 529 } 530 } else { 531 LOG.debug("no files under: " + logDir); 532 } 533 } catch (IOException e) { 534 LOG.warn("Unable to dump " + logDir, e); 535 } 536 } 537 538 private static class TestProcEnv { 539 private CountDownLatch latch = null; 540 541 /** 542 * set/unset a latch. every procedure execute() step will wait on the latch if any. 543 */ 544 public void setWaitLatch(CountDownLatch latch) { 545 this.latch = latch; 546 } 547 548 public void waitOnLatch() throws InterruptedException { 549 if (latch != null) { 550 latch.await(); 551 } 552 } 553 } 554}