001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.After; 039import org.junit.Before; 040import org.junit.ClassRule; 041import org.junit.Test; 042import org.junit.experimental.categories.Category; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value; 047 048@Category({MasterTests.class, SmallTests.class}) 049public class TestProcedureRecovery { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestProcedureRecovery.class); 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureRecovery.class); 056 057 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 058 059 private static TestProcEnv procEnv; 060 private static ProcedureExecutor<TestProcEnv> procExecutor; 061 private static ProcedureStore procStore; 062 private static int procSleepInterval; 063 064 private HBaseCommonTestingUtility htu; 065 private FileSystem fs; 066 private Path testDir; 067 private Path logDir; 068 069 @Before 070 public void setUp() throws IOException { 071 htu = new HBaseCommonTestingUtility(); 072 testDir = htu.getDataTestDir(); 073 fs = testDir.getFileSystem(htu.getConfiguration()); 074 assertTrue(testDir.depth() > 1); 075 076 logDir = new Path(testDir, "proc-logs"); 077 procEnv = new TestProcEnv(); 078 procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); 079 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); 080 procExecutor.testing = new ProcedureExecutor.Testing(); 081 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 082 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 083 procSleepInterval = 0; 084 } 085 086 @After 087 public void tearDown() throws IOException { 088 procExecutor.stop(); 089 procStore.stop(false); 090 fs.delete(logDir, true); 091 } 092 093 private void restart() throws Exception { 094 dumpLogDirState(); 095 ProcedureTestingUtility.restart(procExecutor); 096 dumpLogDirState(); 097 } 098 099 public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> { 100 private int step = 0; 101 102 public TestSingleStepProcedure() { } 103 104 @Override 105 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 106 env.waitOnLatch(); 107 LOG.debug("execute procedure " + this + " step=" + step); 108 step++; 109 setResult(Bytes.toBytes(step)); 110 return null; 111 } 112 113 @Override 114 protected void rollback(TestProcEnv env) { } 115 116 @Override 117 protected boolean abort(TestProcEnv env) { return true; } 118 } 119 120 public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> { 121 private AtomicBoolean abort = new AtomicBoolean(false); 122 private int step = 0; 123 124 @Override 125 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 126 env.waitOnLatch(); 127 LOG.debug("execute procedure " + this + " step=" + step); 128 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 129 step++; 130 Threads.sleepWithoutInterrupt(procSleepInterval); 131 if (isAborted()) { 132 setFailure(new RemoteProcedureException(getClass().getName(), 133 new ProcedureAbortedException( 134 "got an abort at " + getClass().getName() + " step=" + step))); 135 return null; 136 } 137 return null; 138 } 139 140 @Override 141 protected void rollback(TestProcEnv env) { 142 LOG.debug("rollback procedure " + this + " step=" + step); 143 ProcedureTestingUtility.toggleKillBeforeStoreUpdate(procExecutor); 144 step++; 145 } 146 147 @Override 148 protected boolean abort(TestProcEnv env) { 149 abort.set(true); 150 return true; 151 } 152 153 private boolean isAborted() { 154 boolean aborted = abort.get(); 155 BaseTestStepProcedure proc = this; 156 while (proc.hasParent() && !aborted) { 157 proc = (BaseTestStepProcedure)procExecutor.getProcedure(proc.getParentProcId()); 158 aborted = proc.isAborted(); 159 } 160 return aborted; 161 } 162 } 163 164 public static class TestMultiStepProcedure extends BaseTestStepProcedure { 165 public TestMultiStepProcedure() { } 166 167 @Override 168 public Procedure[] execute(TestProcEnv env) throws InterruptedException { 169 super.execute(env); 170 return isFailed() ? null : new Procedure[] { new Step1Procedure() }; 171 } 172 173 public static class Step1Procedure extends BaseTestStepProcedure { 174 public Step1Procedure() { } 175 176 @Override 177 protected Procedure[] execute(TestProcEnv env) throws InterruptedException { 178 super.execute(env); 179 return isFailed() ? null : new Procedure[] { new Step2Procedure() }; 180 } 181 } 182 183 public static class Step2Procedure extends BaseTestStepProcedure { 184 public Step2Procedure() { } 185 } 186 } 187 188 @Test 189 public void testNoopLoad() throws Exception { 190 restart(); 191 } 192 193 @Test 194 public void testSingleStepProcRecovery() throws Exception { 195 Procedure proc = new TestSingleStepProcedure(); 196 procExecutor.testing.killBeforeStoreUpdate = true; 197 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 198 assertFalse(procExecutor.isRunning()); 199 procExecutor.testing.killBeforeStoreUpdate = false; 200 201 // Restart and verify that the procedures restart 202 long restartTs = EnvironmentEdgeManager.currentTime(); 203 restart(); 204 waitProcedure(procId); 205 Procedure<?> result = procExecutor.getResult(procId); 206 assertTrue(result.getLastUpdate() > restartTs); 207 ProcedureTestingUtility.assertProcNotFailed(result); 208 assertEquals(1, Bytes.toInt(result.getResult())); 209 long resultTs = result.getLastUpdate(); 210 211 // Verify that after another restart the result is still there 212 restart(); 213 result = procExecutor.getResult(procId); 214 ProcedureTestingUtility.assertProcNotFailed(result); 215 assertEquals(resultTs, result.getLastUpdate()); 216 assertEquals(1, Bytes.toInt(result.getResult())); 217 } 218 219 @Test 220 public void testMultiStepProcRecovery() throws Exception { 221 // Step 0 - kill 222 Procedure proc = new TestMultiStepProcedure(); 223 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 224 assertFalse(procExecutor.isRunning()); 225 226 // Step 0 exec && Step 1 - kill 227 restart(); 228 waitProcedure(procId); 229 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 230 assertFalse(procExecutor.isRunning()); 231 232 // Step 1 exec && step 2 - kill 233 restart(); 234 waitProcedure(procId); 235 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 236 assertFalse(procExecutor.isRunning()); 237 238 // Step 2 exec 239 restart(); 240 waitProcedure(procId); 241 assertTrue(procExecutor.isRunning()); 242 243 // The procedure is completed 244 Procedure<?> result = procExecutor.getResult(procId); 245 ProcedureTestingUtility.assertProcNotFailed(result); 246 } 247 248 @Test 249 public void testMultiStepRollbackRecovery() throws Exception { 250 // Step 0 - kill 251 Procedure proc = new TestMultiStepProcedure(); 252 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 253 assertFalse(procExecutor.isRunning()); 254 255 // Step 0 exec && Step 1 - kill 256 restart(); 257 waitProcedure(procId); 258 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 259 assertFalse(procExecutor.isRunning()); 260 261 // Step 1 exec && step 2 - kill 262 restart(); 263 waitProcedure(procId); 264 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 265 assertFalse(procExecutor.isRunning()); 266 267 // Step 2 exec - rollback - kill 268 procSleepInterval = 2500; 269 restart(); 270 assertTrue(procExecutor.abort(procId)); 271 waitProcedure(procId); 272 assertFalse(procExecutor.isRunning()); 273 274 // rollback - kill 275 restart(); 276 waitProcedure(procId); 277 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 278 assertFalse(procExecutor.isRunning()); 279 280 // rollback - complete 281 restart(); 282 waitProcedure(procId); 283 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 284 assertFalse(procExecutor.isRunning()); 285 286 // Restart the executor and get the result 287 restart(); 288 waitProcedure(procId); 289 290 // The procedure is completed 291 Procedure<?> result = procExecutor.getResult(procId); 292 ProcedureTestingUtility.assertIsAbortException(result); 293 } 294 295 public static class TestStateMachineProcedure 296 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 297 enum State { STATE_1, STATE_2, STATE_3, DONE } 298 299 public TestStateMachineProcedure() {} 300 301 public TestStateMachineProcedure(final boolean testSubmitChildProc) { 302 this.submitChildProc = testSubmitChildProc; 303 } 304 305 private AtomicBoolean aborted = new AtomicBoolean(false); 306 private int iResult = 0; 307 private boolean submitChildProc = false; 308 309 @Override 310 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) { 311 switch (state) { 312 case STATE_1: 313 LOG.info("execute step 1 " + this); 314 setNextState(State.STATE_2); 315 iResult += 3; 316 break; 317 case STATE_2: 318 LOG.info("execute step 2 " + this); 319 if (submitChildProc) { 320 addChildProcedure(new TestStateMachineProcedure(), new TestStateMachineProcedure()); 321 setNextState(State.DONE); 322 } else { 323 setNextState(State.STATE_3); 324 } 325 iResult += 5; 326 break; 327 case STATE_3: 328 LOG.info("execute step 3 " + this); 329 Threads.sleepWithoutInterrupt(procSleepInterval); 330 if (aborted.get()) { 331 LOG.info("aborted step 3 " + this); 332 setAbortFailure("test", "aborted"); 333 break; 334 } 335 setNextState(State.DONE); 336 iResult += 7; 337 break; 338 case DONE: 339 if (submitChildProc) { 340 addChildProcedure(new TestStateMachineProcedure()); 341 } 342 iResult += 11; 343 setResult(Bytes.toBytes(iResult)); 344 return Flow.NO_MORE_STATE; 345 default: 346 throw new UnsupportedOperationException(); 347 } 348 return Flow.HAS_MORE_STATE; 349 } 350 351 @Override 352 protected void rollbackState(TestProcEnv env, final State state) { 353 switch (state) { 354 case STATE_1: 355 LOG.info("rollback step 1 " + this); 356 break; 357 case STATE_2: 358 LOG.info("rollback step 2 " + this); 359 break; 360 case STATE_3: 361 LOG.info("rollback step 3 " + this); 362 break; 363 default: 364 throw new UnsupportedOperationException(); 365 } 366 } 367 368 @Override 369 protected State getState(final int stateId) { 370 return State.values()[stateId]; 371 } 372 373 @Override 374 protected int getStateId(final State state) { 375 return state.ordinal(); 376 } 377 378 @Override 379 protected State getInitialState() { 380 return State.STATE_1; 381 } 382 383 @Override 384 protected boolean abort(TestProcEnv env) { 385 aborted.set(true); 386 return true; 387 } 388 389 @Override 390 protected void serializeStateData(ProcedureStateSerializer serializer) 391 throws IOException { 392 super.serializeStateData(serializer); 393 Int32Value.Builder builder = Int32Value.newBuilder().setValue(iResult); 394 serializer.serialize(builder.build()); 395 } 396 397 @Override 398 protected void deserializeStateData(ProcedureStateSerializer serializer) 399 throws IOException { 400 super.deserializeStateData(serializer); 401 Int32Value value = serializer.deserialize(Int32Value.class); 402 iResult = value.getValue(); 403 } 404 } 405 406 @Test 407 public void testStateMachineMultipleLevel() throws Exception { 408 long procId = procExecutor.submitProcedure(new TestStateMachineProcedure(true)); 409 // Wait the completion 410 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 411 Procedure<?> result = procExecutor.getResult(procId); 412 ProcedureTestingUtility.assertProcNotFailed(result); 413 assertEquals(19, Bytes.toInt(result.getResult())); 414 assertEquals(4, procExecutor.getLastProcId()); 415 } 416 417 @Test 418 public void testStateMachineRecovery() throws Exception { 419 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); 420 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); 421 422 // Step 1 - kill 423 Procedure proc = new TestStateMachineProcedure(); 424 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 425 assertFalse(procExecutor.isRunning()); 426 427 // Step 1 exec && Step 2 - kill 428 restart(); 429 waitProcedure(procId); 430 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 431 assertFalse(procExecutor.isRunning()); 432 433 // Step 2 exec && step 3 - kill 434 restart(); 435 waitProcedure(procId); 436 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 437 assertFalse(procExecutor.isRunning()); 438 439 // Step 3 exec 440 restart(); 441 waitProcedure(procId); 442 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 443 assertFalse(procExecutor.isRunning()); 444 445 restart(); 446 waitProcedure(procId); 447 assertTrue(procExecutor.isRunning()); 448 449 // The procedure is completed 450 Procedure<?> result = procExecutor.getResult(procId); 451 ProcedureTestingUtility.assertProcNotFailed(result); 452 assertEquals(26, Bytes.toInt(result.getResult())); 453 } 454 455 @Test 456 public void testStateMachineRollbackRecovery() throws Exception { 457 ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, true); 458 ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, true); 459 460 // Step 1 - kill 461 Procedure proc = new TestStateMachineProcedure(); 462 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 463 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 464 assertFalse(procExecutor.isRunning()); 465 466 // Step 1 exec && Step 2 - kill 467 restart(); 468 waitProcedure(procId); 469 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 470 assertFalse(procExecutor.isRunning()); 471 472 // Step 2 exec && step 3 - kill 473 restart(); 474 waitProcedure(procId); 475 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 476 assertFalse(procExecutor.isRunning()); 477 478 // Step 3 exec - rollback step 3 - kill 479 procSleepInterval = 2500; 480 restart(); 481 assertTrue(procExecutor.abort(procId)); 482 waitProcedure(procId); 483 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 484 assertFalse(procExecutor.isRunning()); 485 486 // Rollback step 3 - rollback step 2 - kill 487 restart(); 488 waitProcedure(procId); 489 assertFalse(procExecutor.isRunning()); 490 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 491 492 // Rollback step 2 - step 1 - kill 493 restart(); 494 waitProcedure(procId); 495 assertFalse(procExecutor.isRunning()); 496 ProcedureTestingUtility.assertProcNotYetCompleted(procExecutor, procId); 497 498 // Rollback step 1 - complete 499 restart(); 500 waitProcedure(procId); 501 assertTrue(procExecutor.isRunning()); 502 503 // The procedure is completed 504 Procedure<?> result = procExecutor.getResult(procId); 505 ProcedureTestingUtility.assertIsAbortException(result); 506 } 507 508 private void waitProcedure(final long procId) { 509 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 510 dumpLogDirState(); 511 } 512 513 private void dumpLogDirState() { 514 try { 515 FileStatus[] files = fs.listStatus(logDir); 516 if (files != null && files.length > 0) { 517 for (FileStatus file: files) { 518 assertTrue(file.toString(), file.isFile()); 519 LOG.debug("log file " + file.getPath() + " size=" + file.getLen()); 520 } 521 } else { 522 LOG.debug("no files under: " + logDir); 523 } 524 } catch (IOException e) { 525 LOG.warn("Unable to dump " + logDir, e); 526 } 527 } 528 529 private static class TestProcEnv { 530 private CountDownLatch latch = null; 531 532 /** 533 * set/unset a latch. every procedure execute() step will wait on the latch if any. 534 */ 535 public void setWaitLatch(CountDownLatch latch) { 536 this.latch = latch; 537 } 538 539 public void waitOnLatch() throws InterruptedException { 540 if (latch != null) { 541 latch.await(); 542 } 543 } 544 } 545}