001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Objects; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 044 045@Category({MasterTests.class, SmallTests.class}) 046public class TestProcedureExecution { 047 @ClassRule 048 public static final HBaseClassTestRule CLASS_RULE = 049 HBaseClassTestRule.forClass(TestProcedureExecution.class); 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecution.class); 052 053 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 054 private static final Procedure<?> NULL_PROC = null; 055 056 private ProcedureExecutor<Void> procExecutor; 057 private ProcedureStore procStore; 058 059 private HBaseCommonTestingUtility htu; 060 private FileSystem fs; 061 private Path testDir; 062 private Path logDir; 063 064 @Before 065 public void setUp() throws IOException { 066 htu = new HBaseCommonTestingUtility(); 067 testDir = htu.getDataTestDir(); 068 fs = testDir.getFileSystem(htu.getConfiguration()); 069 assertTrue(testDir.depth() > 1); 070 071 logDir = new Path(testDir, "proc-logs"); 072 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 073 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); 074 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 075 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 076 } 077 078 @After 079 public void tearDown() throws IOException { 080 procExecutor.stop(); 081 procStore.stop(false); 082 fs.delete(logDir, true); 083 } 084 085 private static class TestProcedureException extends IOException { 086 087 private static final long serialVersionUID = 8798565784658913798L; 088 089 public TestProcedureException(String msg) { 090 super(msg); 091 } 092 } 093 094 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 095 private final Procedure<Void>[] subProcs; 096 private final List<String> state; 097 private final Exception failure; 098 private final String name; 099 100 public TestSequentialProcedure() { 101 throw new UnsupportedOperationException("recovery should not be triggered here"); 102 } 103 104 public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) { 105 this.state = state; 106 this.subProcs = subProcs; 107 this.name = name; 108 this.failure = null; 109 } 110 111 public TestSequentialProcedure(String name, List<String> state, Exception failure) { 112 this.state = state; 113 this.subProcs = null; 114 this.name = name; 115 this.failure = failure; 116 } 117 118 @Override 119 protected Procedure<Void>[] execute(Void env) { 120 state.add(name + "-execute"); 121 if (failure != null) { 122 setFailure(new RemoteProcedureException(name + "-failure", failure)); 123 return null; 124 } 125 return subProcs; 126 } 127 128 @Override 129 protected void rollback(Void env) { 130 state.add(name + "-rollback"); 131 } 132 133 @Override 134 protected boolean abort(Void env) { 135 state.add(name + "-abort"); 136 return true; 137 } 138 } 139 140 @Test 141 public void testBadSubprocList() { 142 List<String> state = new ArrayList<>(); 143 Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state); 144 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC); 145 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 146 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 147 148 // subProc1 has a "null" subprocedure which is catched as InvalidArgument 149 // failed state with 2 execute and 2 rollback 150 LOG.info(Objects.toString(state)); 151 Procedure<?> result = procExecutor.getResult(rootId); 152 assertTrue(state.toString(), result.isFailed()); 153 ProcedureTestingUtility.assertIsIllegalArgumentException(result); 154 155 assertEquals(state.toString(), 4, state.size()); 156 assertEquals("rootProc-execute", state.get(0)); 157 assertEquals("subProc1-execute", state.get(1)); 158 assertEquals("subProc1-rollback", state.get(2)); 159 assertEquals("rootProc-rollback", state.get(3)); 160 } 161 162 @Test 163 public void testSingleSequentialProc() { 164 List<String> state = new ArrayList<>(); 165 Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state); 166 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); 167 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 168 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 169 170 // successful state, with 3 execute 171 LOG.info(Objects.toString(state)); 172 Procedure<?> result = procExecutor.getResult(rootId); 173 ProcedureTestingUtility.assertProcNotFailed(result); 174 assertEquals(state.toString(), 3, state.size()); 175 } 176 177 @Test 178 public void testSingleSequentialProcRollback() { 179 List<String> state = new ArrayList<>(); 180 Procedure<Void> subProc2 = 181 new TestSequentialProcedure("subProc2", state, new TestProcedureException("fail test")); 182 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); 183 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 184 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 185 186 // the 3rd proc fail, rollback after 2 successful execution 187 LOG.info(Objects.toString(state)); 188 Procedure<?> result = procExecutor.getResult(rootId); 189 assertTrue(state.toString(), result.isFailed()); 190 LOG.info(result.getException().getMessage()); 191 Throwable cause = ProcedureTestingUtility.getExceptionCause(result); 192 assertTrue("expected TestProcedureException, got " + cause, 193 cause instanceof TestProcedureException); 194 195 assertEquals(state.toString(), 6, state.size()); 196 assertEquals("rootProc-execute", state.get(0)); 197 assertEquals("subProc1-execute", state.get(1)); 198 assertEquals("subProc2-execute", state.get(2)); 199 assertEquals("subProc2-rollback", state.get(3)); 200 assertEquals("subProc1-rollback", state.get(4)); 201 assertEquals("rootProc-rollback", state.get(5)); 202 } 203 204 public static class TestFaultyRollback extends SequentialProcedure<Void> { 205 private int retries = 0; 206 207 public TestFaultyRollback() { } 208 209 @Override 210 protected Procedure<Void>[] execute(Void env) { 211 setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback")); 212 return null; 213 } 214 215 @Override 216 protected void rollback(Void env) throws IOException { 217 if (++retries < 3) { 218 LOG.info("inject rollback failure " + retries); 219 throw new IOException("injected failure number " + retries); 220 } 221 LOG.info("execute non faulty rollback step retries=" + retries); 222 } 223 224 @Override 225 protected boolean abort(Void env) { 226 return false; 227 } 228 } 229 230 @Test 231 public void testRollbackRetriableFailure() { 232 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback()); 233 234 Procedure<?> result = procExecutor.getResult(procId); 235 assertTrue("expected a failure", result.isFailed()); 236 LOG.info(result.getException().getMessage()); 237 Throwable cause = ProcedureTestingUtility.getExceptionCause(result); 238 assertTrue("expected TestProcedureException, got " + cause, 239 cause instanceof TestProcedureException); 240 } 241 242 public static class TestWaitingProcedure extends SequentialProcedure<Void> { 243 private final List<String> state; 244 private final boolean hasChild; 245 private final String name; 246 247 public TestWaitingProcedure() { 248 throw new UnsupportedOperationException("recovery should not be triggered here"); 249 } 250 251 public TestWaitingProcedure(String name, List<String> state, boolean hasChild) { 252 this.hasChild = hasChild; 253 this.state = state; 254 this.name = name; 255 } 256 257 @Override 258 protected Procedure<Void>[] execute(Void env) { 259 state.add(name + "-execute"); 260 setState(ProcedureState.WAITING_TIMEOUT); 261 return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null; 262 } 263 264 @Override 265 protected void rollback(Void env) { 266 state.add(name + "-rollback"); 267 } 268 269 @Override 270 protected boolean abort(Void env) { 271 state.add(name + "-abort"); 272 return true; 273 } 274 275 public static class TestWaitChild extends SequentialProcedure<Void> { 276 private final List<String> state; 277 private final String name; 278 279 public TestWaitChild() { 280 throw new UnsupportedOperationException("recovery should not be triggered here"); 281 } 282 283 public TestWaitChild(String name, List<String> state) { 284 this.name = name; 285 this.state = state; 286 } 287 288 @Override 289 protected Procedure<Void>[] execute(Void env) { 290 state.add(name + "-child-execute"); 291 return null; 292 } 293 294 @Override 295 protected void rollback(Void env) { 296 throw new UnsupportedOperationException("should not rollback a successful child procedure"); 297 } 298 299 @Override 300 protected boolean abort(Void env) { 301 state.add(name + "-child-abort"); 302 return true; 303 } 304 } 305 } 306 307 @Test 308 public void testAbortTimeout() { 309 final int PROC_TIMEOUT_MSEC = 2500; 310 List<String> state = new ArrayList<>(); 311 Procedure<Void> proc = new TestWaitingProcedure("wproc", state, false); 312 proc.setTimeout(PROC_TIMEOUT_MSEC); 313 long startTime = EnvironmentEdgeManager.currentTime(); 314 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 315 long execTime = EnvironmentEdgeManager.currentTime() - startTime; 316 LOG.info(Objects.toString(state)); 317 assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC); 318 Procedure<?> result = procExecutor.getResult(rootId); 319 assertTrue(state.toString(), result.isFailed()); 320 ProcedureTestingUtility.assertIsTimeoutException(result); 321 assertEquals(state.toString(), 2, state.size()); 322 assertEquals("wproc-execute", state.get(0)); 323 assertEquals("wproc-rollback", state.get(1)); 324 } 325 326 @Test 327 public void testAbortTimeoutWithChildren() { 328 List<String> state = new ArrayList<>(); 329 Procedure<Void> proc = new TestWaitingProcedure("wproc", state, true); 330 proc.setTimeout(2500); 331 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 332 LOG.info(Objects.toString(state)); 333 Procedure<?> result = procExecutor.getResult(rootId); 334 assertTrue(state.toString(), result.isFailed()); 335 ProcedureTestingUtility.assertIsTimeoutException(result); 336 assertEquals(state.toString(), 3, state.size()); 337 assertEquals("wproc-execute", state.get(0)); 338 assertEquals("wproc-child-execute", state.get(1)); 339 assertEquals("wproc-rollback", state.get(2)); 340 } 341}