001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 047 048@Category({MasterTests.class, MediumTests.class}) 049public class TestProcedureRecovery { 050 @ClassRule 051 public static final HBaseClassTestRule CLASS_RULE = 052 HBaseClassTestRule.forClass(TestProcedureRecovery.class); 053 054 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class); 055 056 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 057 058 private static TestProcEnv procEnv; 059 private static ProcedureExecutor<TestProcEnv> procExecutor; 060 private static ProcedureStore procStore; 061 private static int procSleepInterval; 062 063 private HBaseCommonTestingUtility htu; 064 private FileSystem fs; 065 private Path testDir; 066 private Path logDir; 067 068 @Before 069 public void setUp() throws IOException { 070 htu = new HBaseCommonTestingUtility(); 071 testDir = htu.getDataTestDir(); 072 fs = testDir.getFileSystem(htu.getConfiguration()); 073 assertTrue(testDir.depth() > 1); 074 075 logDir = new Path(testDir, "proc-logs"); 076 procEnv = new TestProcEnv(); 077 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 078 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 079 procExecutor.testing = new ProcedureExecutor.Testing(); 080 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 081 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 082 procSleepInterval = 0; 083 } 084 085 @After 086 public void tearDown() throws IOException { 087 procExecutor.stop(); 088 procStore.stop(false); 089 fs.delete(logDir, true); 090 } 091 092 private void restart() throws Exception { 093 dumpLogDirState(); 094 ProcedureTestingUtility.restart(procExecutor); 095 dumpLogDirState(); 096 } 097 098 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 099 private int step = 0; 100 101 public TestSingleStepProcedure() { } 102 103 @Override 104 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 105 env.waitOnLatch(); 106 LOG.debug("execute procedure " + this + " step=" + step); 107 step++; 108 setResult(Bytes.toBytes(step)); 109 return null; 110 } 111 112 @Override 113 protected void rollback(TestProcEnv env) { } 114 115 @Override 116 protected boolean abort(TestProcEnv env) { 117 return true; 118 } 119 } 120 121 public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> { 122 private AtomicBoolean abort = new AtomicBoolean(false); 123 private int step = 0; 124 125 @Override 126 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 127 env.waitOnLatch(); 128 LOG.debug("execute procedure " + this + " step=" + step); 129 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 130 step++; 131 Threads.sleepWithoutInterrupt(procSleepInterval); 132 if (isAborted()) { 133 setFailure(new RemoteProcedureException(getClass().getName(), 134 new ProcedureAbortedException( 135 "got an abort at " + getClass().getName() + " step=" + step))); 136 return null; 137 } 138 return null; 139 } 140 141 @Override 142 protected void rollback(TestProcEnv env) { 143 LOG.debug("rollback procedure " + this + " step=" + step); 144 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 145 step++; 146 } 147 148 @Override 149 protected boolean abort(TestProcEnv env) { 150 abort.set(true); 151 return true; 152 } 153 154 private boolean isAborted() { 155 boolean aborted = abort.get(); 156 BaseTestStepProcedure proc = this; 157 while (proc.hasParent() && !aborted) { 158 proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId()); 159 aborted = proc.isAborted(); 160 } 161 return aborted; 162 } 163 } 164 165 public static class TestMultiStepProcedure extends BaseTestStepProcedure { 166 public TestMultiStepProcedure() { } 167 168 @Override 169 public Procedure[] execute(TestProcEnv env) throws InterruptedException { 170 super.execute(env); 171 return isFailed() ? null : new Procedure[] { new Step1Procedure() }; 172 } 173 174 public static class Step1Procedure extends BaseTestStepProcedure { 175 public Step1Procedure() { } 176 177 @Override 178 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 179 super.execute(env); 180 return isFailed() ? null : new Procedure[] { new Step2Procedure() }; 181 } 182 } 183 184 public static class Step2Procedure extends BaseTestStepProcedure { 185 public Step2Procedure() { } 186 } 187 } 188 189 @Test 190 public void testNoopLoad() throws Exception { 191 restart(); 192 } 193 194 @Test 195 public void testSingleStepProcRecovery() throws Exception { 196 Procedure proc = new TestSingleStepProcedure(); 197 procExecutor.testing.killBeforeStoreUpdate = true; 198 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 199 assertFalse(procExecutor.isRunning()); 200 procExecutor.testing.killBeforeStoreUpdate = false; 201 202 // Restart and verify that the procedures restart 203 long restartTs = EnvironmentEdgeManager.currentTime(); 204 restart(); 205 waitProcedure(procId); 206 Procedure<?> result = procExecutor.getResult(procId); 207 assertTrue(result.getLastUpdate() > restartTs); 208 ProcedureTestingUtility.assertProcNotFailed(result); 209 assertEquals(1, Bytes.toInt(result.getResult())); 210 long resultTs = result.getLastUpdate(); 211 212 // Verify that after another restart the result is still there 213 restart(); 214 result = procExecutor.getResult(procId); 215 ProcedureTestingUtility.assertProcNotFailed(result); 216 assertEquals(resultTs, result.getLastUpdate()); 217 assertEquals(1, Bytes.toInt(result.getResult())); 218 } 219 220 @Test 221 public void testMultiStepProcRecovery() throws Exception { 222 // Step 0 - kill 223 Procedure proc = new TestMultiStepProcedure(); 224 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 225 assertFalse(procExecutor.isRunning()); 226 227 // Step 0 exec && Step 1 - kill 228 restart(); 229 waitProcedure(procId); 230 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 231 assertFalse(procExecutor.isRunning()); 232 233 // Step 1 exec && step 2 - kill 234 restart(); 235 waitProcedure(procId); 236 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 237 assertFalse(procExecutor.isRunning()); 238 239 // Step 2 exec 240 restart(); 241 waitProcedure(procId); 242 assertTrue(procExecutor.isRunning()); 243 244 // The procedure is completed 245 Procedure<?> result = procExecutor.getResult(procId); 246 ProcedureTestingUtility.assertProcNotFailed(result); 247 } 248 249 @Test 250 public void testMultiStepRollbackRecovery() throws Exception { 251 // Step 0 - kill 252 Procedure proc = new TestMultiStepProcedure(); 253 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 254 assertFalse(procExecutor.isRunning()); 255 256 // Step 0 exec && Step 1 - kill 257 restart(); 258 waitProcedure(procId); 259 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 260 assertFalse(procExecutor.isRunning()); 261 262 // Step 1 exec && step 2 - kill 263 restart(); 264 waitProcedure(procId); 265 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 266 assertFalse(procExecutor.isRunning()); 267 268 // Step 2 exec - rollback - kill 269 procSleepInterval = 2500; 270 restart(); 271 assertTrue(procExecutor.abort(procId)); 272 waitProcedure(procId); 273 assertFalse(procExecutor.isRunning()); 274 275 // rollback - kill 276 restart(); 277 waitProcedure(procId); 278 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 279 assertFalse(procExecutor.isRunning()); 280 281 // rollback - complete 282 restart(); 283 waitProcedure(procId); 284 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 285 assertFalse(procExecutor.isRunning()); 286 287 // Restart the executor and get the result 288 restart(); 289 waitProcedure(procId); 290 291 // The procedure is completed 292 Procedure<?> result = procExecutor.getResult(procId); 293 ProcedureTestingUtility.assertIsAbortException(result); 294 } 295 296 public static class TestStateMachineProcedure 297 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 298 enum State { STATE_1, STATE_2, STATE_3, DONE } 299 300 public TestStateMachineProcedure() {} 301 302 public TestStateMachineProcedure(final boolean testSubmitChildProc) { 303 this.submitChildProc = testSubmitChildProc; 304 } 305 306 private AtomicBoolean aborted = new AtomicBoolean(false); 307 private int iResult = 0; 308 private boolean submitChildProc = false; 309 310 @Override 311 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { 312 switch (state) { 313 case STATE_1: 314 LOG.info("execute step 1 " + this); 315 setNextState(State.STATE_2); 316 iResult += 3; 317 break; 318 case STATE_2: 319 LOG.info("execute step 2 " + this); 320 if (submitChildProc) { 321 addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure()); 322 setNextState(State.DONE); 323 } else { 324 setNextState(State.STATE_3); 325 } 326 iResult += 5; 327 break; 328 case STATE_3: 329 LOG.info("execute step 3 " + this); 330 Threads.sleepWithoutInterrupt(procSleepInterval); 331 if (aborted.get()) { 332 LOG.info("aborted step 3 " + this); 333 setAbortFailure("test", "aborted"); 334 break; 335 } 336 setNextState(State.DONE); 337 iResult += 7; 338 break; 339 case DONE: 340 if (submitChildProc) { 341 addChildProcedure(new TestStateMachineProcedure()); 342 } 343 iResult += 11; 344 setResult(Bytes.toBytes(iResult)); 345 return Flow.NO_MORE_STATE; 346 default: 347 throw new UnsupportedOperationException(); 348 } 349 return Flow.HAS_MORE_STATE; 350 } 351 352 @Override 353 protected void rollbackState(TestProcEnv env, final State state) { 354 switch (state) { 355 case STATE_1: 356 LOG.info("rollback step 1 " + this); 357 break; 358 case STATE_2: 359 LOG.info("rollback step 2 " + this); 360 break; 361 case STATE_3: 362 LOG.info("rollback step 3 " + this); 363 break; 364 default: 365 throw new UnsupportedOperationException(); 366 } 367 } 368 369 @Override 370 protected State getState(final int stateId) { 371 return State.values()[stateId]; 372 } 373 374 @Override 375 protected int getStateId(final State state) { 376 return state.ordinal(); 377 } 378 379 @Override 380 protected State getInitialState() { 381 return State.STATE_1; 382 } 383 384 @Override 385 protected boolean abort(TestProcEnv env) { 386 aborted.set(true); 387 return true; 388 } 389 390 @Override 391 protected void serializeStateData(ProcedureStateSerializer serializer) 392 throws IOException { 393 super.serializeStateData(serializer); 394 Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult); 395 serializer.serialize(builder.build()); 396 } 397 398 @Override 399 protected void deserializeStateData(ProcedureStateSerializer serializer) 400 throws IOException { 401 super.deserializeStateData(serializer); 402 Int32Value value = serializer.deserialize(Int32Value.class); 403 iResult = value.getValue(); 404 } 405 } 406 407 @Test 408 public void testStateMachineMultipleLevel() throws Exception { 409 long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); 410 // Wait the completion 411 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 412 Procedure<?> result = procExecutor.getResult(procId); 413 ProcedureTestingUtility.assertProcNotFailed(result); 414 assertEquals(19, Bytes.toInt(result.getResult())); 415 assertEquals(4, procExecutor.getLastProcId()); 416 } 417 418 @Test 419 public void testStateMachineRecovery() throws Exception { 420 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); 421 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); 422 423 // Step 1 - kill 424 Procedure proc = new TestStateMachineProcedure(); 425 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 426 assertFalse(procExecutor.isRunning()); 427 428 // Step 1 exec && Step 2 - kill 429 restart(); 430 waitProcedure(procId); 431 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 432 assertFalse(procExecutor.isRunning()); 433 434 // Step 2 exec && step 3 - kill 435 restart(); 436 waitProcedure(procId); 437 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 438 assertFalse(procExecutor.isRunning()); 439 440 // Step 3 exec 441 restart(); 442 waitProcedure(procId); 443 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 444 assertFalse(procExecutor.isRunning()); 445 446 restart(); 447 waitProcedure(procId); 448 assertTrue(procExecutor.isRunning()); 449 450 // The procedure is completed 451 Procedure<?> result = procExecutor.getResult(procId); 452 ProcedureTestingUtility.assertProcNotFailed(result); 453 assertEquals(26, Bytes.toInt(result.getResult())); 454 } 455 456 @Test 457 public void testStateMachineRollbackRecovery() throws Exception { 458 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); 459 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); 460 461 // Step 1 - kill 462 Procedure proc = new TestStateMachineProcedure(); 463 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 464 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 465 assertFalse(procExecutor.isRunning()); 466 467 // Step 1 exec && Step 2 - kill 468 restart(); 469 waitProcedure(procId); 470 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 471 assertFalse(procExecutor.isRunning()); 472 473 // Step 2 exec && step 3 - kill 474 restart(); 475 waitProcedure(procId); 476 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 477 assertFalse(procExecutor.isRunning()); 478 479 // Step 3 exec - rollback step 3 - kill 480 procSleepInterval = 2500; 481 restart(); 482 assertTrue(procExecutor.abort(procId)); 483 waitProcedure(procId); 484 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 485 assertFalse(procExecutor.isRunning()); 486 487 // Rollback step 3 - rollback step 2 - kill 488 restart(); 489 waitProcedure(procId); 490 assertFalse(procExecutor.isRunning()); 491 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 492 493 // Rollback step 2 - step 1 - kill 494 restart(); 495 waitProcedure(procId); 496 assertFalse(procExecutor.isRunning()); 497 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 498 499 // Rollback step 1 - complete 500 restart(); 501 waitProcedure(procId); 502 assertTrue(procExecutor.isRunning()); 503 504 // The procedure is completed 505 Procedure<?> result = procExecutor.getResult(procId); 506 ProcedureTestingUtility.assertIsAbortException(result); 507 } 508 509 private void waitProcedure(final long procId) { 510 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 511 dumpLogDirState(); 512 } 513 514 private void dumpLogDirState() { 515 try { 516 FileStatus[] files = fs.listStatus(logDir); 517 if (files != null && files.length > 0) { 518 for (FileStatus file: files) { 519 assertTrue(file.toString(), file.isFile()); 520 LOG.debug("log file " + file.getPath() + " size=" + file.getLen()); 521 } 522 } else { 523 LOG.debug("no files under: " + logDir); 524 } 525 } catch (IOException e) { 526 LOG.warn("Unable to dump " + logDir, e); 527 } 528 } 529 530 private static class TestProcEnv { 531 private CountDownLatch latch = null; 532 533 /** 534 * set/unset a latch. every procedure execute() step will wait on the latch if any. 535 */ 536 public void setWaitLatch(CountDownLatch latch) { 537 this.latch = latch; 538 } 539 540 public void waitOnLatch() throws InterruptedException { 541 if (latch != null) { 542 latch.await(); 543 } 544 } 545 } 546}