001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Objects; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 044 045@Category({MasterTests.class, SmallTests.class}) 046public class TestProcedureExecution { 047 048 @ClassRule 049 public static final HBaseClassTestRule CLASS_RULE = 050 HBaseClassTestRule.forClass(TestProcedureExecution.class); 051 052 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecution.class); 053 054 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 055 private static final Procedure<?> NULL_PROC = null; 056 057 private ProcedureExecutor<Void> procExecutor; 058 private ProcedureStore procStore; 059 060 private HBaseCommonTestingUtility htu; 061 private FileSystem fs; 062 private Path testDir; 063 private Path logDir; 064 065 @Before 066 public void setUp() throws IOException { 067 htu = new HBaseCommonTestingUtility(); 068 testDir = htu.getDataTestDir(); 069 fs = testDir.getFileSystem(htu.getConfiguration()); 070 assertTrue(testDir.depth() > 1); 071 072 logDir = new Path(testDir, "proc-logs"); 073 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 074 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); 075 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 076 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 077 } 078 079 @After 080 public void tearDown() throws IOException { 081 procExecutor.stop(); 082 procStore.stop(false); 083 fs.delete(logDir, true); 084 } 085 086 private static class TestProcedureException extends IOException { 087 088 private static final long serialVersionUID = 8798565784658913798L; 089 090 public TestProcedureException(String msg) { 091 super(msg); 092 } 093 } 094 095 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 096 private final Procedure<Void>[] subProcs; 097 private final List<String> state; 098 private final Exception failure; 099 private final String name; 100 101 public TestSequentialProcedure() { 102 throw new UnsupportedOperationException("recovery should not be triggered here"); 103 } 104 105 public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) { 106 this.state = state; 107 this.subProcs = subProcs; 108 this.name = name; 109 this.failure = null; 110 } 111 112 public TestSequentialProcedure(String name, List<String> state, Exception failure) { 113 this.state = state; 114 this.subProcs = null; 115 this.name = name; 116 this.failure = failure; 117 } 118 119 @Override 120 protected Procedure<Void>[] execute(Void env) { 121 state.add(name + "-execute"); 122 if (failure != null) { 123 setFailure(new RemoteProcedureException(name + "-failure", failure)); 124 return null; 125 } 126 return subProcs; 127 } 128 129 @Override 130 protected void rollback(Void env) { 131 state.add(name + "-rollback"); 132 } 133 134 @Override 135 protected boolean abort(Void env) { 136 state.add(name + "-abort"); 137 return true; 138 } 139 } 140 141 @Test 142 public void testBadSubprocList() { 143 List<String> state = new ArrayList<>(); 144 Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state); 145 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC); 146 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 147 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 148 149 // subProc1 has a "null" subprocedure which is catched as InvalidArgument 150 // failed state with 2 execute and 2 rollback 151 LOG.info(Objects.toString(state)); 152 Procedure<?> result = procExecutor.getResult(rootId); 153 assertTrue(state.toString(), result.isFailed()); 154 ProcedureTestingUtility.assertIsIllegalArgumentException(result); 155 156 assertEquals(state.toString(), 4, state.size()); 157 assertEquals("rootProc-execute", state.get(0)); 158 assertEquals("subProc1-execute", state.get(1)); 159 assertEquals("subProc1-rollback", state.get(2)); 160 assertEquals("rootProc-rollback", state.get(3)); 161 } 162 163 @Test 164 public void testSingleSequentialProc() { 165 List<String> state = new ArrayList<>(); 166 Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state); 167 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); 168 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 169 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 170 171 // successful state, with 3 execute 172 LOG.info(Objects.toString(state)); 173 Procedure<?> result = procExecutor.getResult(rootId); 174 ProcedureTestingUtility.assertProcNotFailed(result); 175 assertEquals(state.toString(), 3, state.size()); 176 } 177 178 @Test 179 public void testSingleSequentialProcRollback() { 180 List<String> state = new ArrayList<>(); 181 Procedure<Void> subProc2 = 182 new TestSequentialProcedure("subProc2", state, new TestProcedureException("fail test")); 183 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); 184 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 185 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 186 187 // the 3rd proc fail, rollback after 2 successful execution 188 LOG.info(Objects.toString(state)); 189 Procedure<?> result = procExecutor.getResult(rootId); 190 assertTrue(state.toString(), result.isFailed()); 191 LOG.info(result.getException().getMessage()); 192 Throwable cause = ProcedureTestingUtility.getExceptionCause(result); 193 assertTrue("expected TestProcedureException, got " + cause, 194 cause instanceof TestProcedureException); 195 196 assertEquals(state.toString(), 6, state.size()); 197 assertEquals("rootProc-execute", state.get(0)); 198 assertEquals("subProc1-execute", state.get(1)); 199 assertEquals("subProc2-execute", state.get(2)); 200 assertEquals("subProc2-rollback", state.get(3)); 201 assertEquals("subProc1-rollback", state.get(4)); 202 assertEquals("rootProc-rollback", state.get(5)); 203 } 204 205 public static class TestFaultyRollback extends SequentialProcedure<Void> { 206 private int retries = 0; 207 208 public TestFaultyRollback() { } 209 210 @Override 211 protected Procedure<Void>[] execute(Void env) { 212 setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback")); 213 return null; 214 } 215 216 @Override 217 protected void rollback(Void env) throws IOException { 218 if (++retries < 3) { 219 LOG.info("inject rollback failure " + retries); 220 throw new IOException("injected failure number " + retries); 221 } 222 LOG.info("execute non faulty rollback step retries=" + retries); 223 } 224 225 @Override 226 protected boolean abort(Void env) { return false; } 227 } 228 229 @Test 230 public void testRollbackRetriableFailure() { 231 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback()); 232 233 Procedure<?> result = procExecutor.getResult(procId); 234 assertTrue("expected a failure", result.isFailed()); 235 LOG.info(result.getException().getMessage()); 236 Throwable cause = ProcedureTestingUtility.getExceptionCause(result); 237 assertTrue("expected TestProcedureException, got " + cause, 238 cause instanceof TestProcedureException); 239 } 240 241 public static class TestWaitingProcedure extends SequentialProcedure<Void> { 242 private final List<String> state; 243 private final boolean hasChild; 244 private final String name; 245 246 public TestWaitingProcedure() { 247 throw new UnsupportedOperationException("recovery should not be triggered here"); 248 } 249 250 public TestWaitingProcedure(String name, List<String> state, boolean hasChild) { 251 this.hasChild = hasChild; 252 this.state = state; 253 this.name = name; 254 } 255 256 @Override 257 protected Procedure<Void>[] execute(Void env) { 258 state.add(name + "-execute"); 259 setState(ProcedureState.WAITING_TIMEOUT); 260 return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null; 261 } 262 263 @Override 264 protected void rollback(Void env) { 265 state.add(name + "-rollback"); 266 } 267 268 @Override 269 protected boolean abort(Void env) { 270 state.add(name + "-abort"); 271 return true; 272 } 273 274 public static class TestWaitChild extends SequentialProcedure<Void> { 275 private final List<String> state; 276 private final String name; 277 278 public TestWaitChild() { 279 throw new UnsupportedOperationException("recovery should not be triggered here"); 280 } 281 282 public TestWaitChild(String name, List<String> state) { 283 this.name = name; 284 this.state = state; 285 } 286 287 @Override 288 protected Procedure<Void>[] execute(Void env) { 289 state.add(name + "-child-execute"); 290 return null; 291 } 292 293 @Override 294 protected void rollback(Void env) { 295 throw new UnsupportedOperationException("should not rollback a successful child procedure"); 296 } 297 298 @Override 299 protected boolean abort(Void env) { 300 state.add(name + "-child-abort"); 301 return true; 302 } 303 } 304 } 305 306 @Test 307 public void testAbortTimeout() { 308 final int PROC_TIMEOUT_MSEC = 2500; 309 List<String> state = new ArrayList<>(); 310 Procedure<Void> proc = new TestWaitingProcedure("wproc", state, false); 311 proc.setTimeout(PROC_TIMEOUT_MSEC); 312 long startTime = EnvironmentEdgeManager.currentTime(); 313 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 314 long execTime = EnvironmentEdgeManager.currentTime() - startTime; 315 LOG.info(Objects.toString(state)); 316 assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC); 317 Procedure<?> result = procExecutor.getResult(rootId); 318 assertTrue(state.toString(), result.isFailed()); 319 ProcedureTestingUtility.assertIsTimeoutException(result); 320 assertEquals(state.toString(), 2, state.size()); 321 assertEquals("wproc-execute", state.get(0)); 322 assertEquals("wproc-rollback", state.get(1)); 323 } 324 325 @Test 326 public void testAbortTimeoutWithChildren() { 327 List<String> state = new ArrayList<>(); 328 Procedure<Void> proc = new TestWaitingProcedure("wproc", state, true); 329 proc.setTimeout(2500); 330 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 331 LOG.info(Objects.toString(state)); 332 Procedure<?> result = procExecutor.getResult(rootId); 333 assertTrue(state.toString(), result.isFailed()); 334 ProcedureTestingUtility.assertIsTimeoutException(result); 335 assertEquals(state.toString(), 3, state.size()); 336 assertEquals("wproc-execute", state.get(0)); 337 assertEquals("wproc-child-execute", state.get(1)); 338 assertEquals("wproc-rollback", state.get(2)); 339 } 340}