001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertInstanceOf; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Objects; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.SmallTests; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.junit.jupiter.api.AfterEach; 036import org.junit.jupiter.api.BeforeEach; 037import org.junit.jupiter.api.Tag; 038import org.junit.jupiter.api.Test; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 043 044@Tag(MasterTests.TAG) 045@Tag(SmallTests.TAG) 046public class TestProcedureExecution { 047 048 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecution.class); 049 050 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 051 private static final Procedure<?> NULL_PROC = null; 052 053 private ProcedureExecutor<Void> procExecutor; 054 private ProcedureStore procStore; 055 056 private HBaseCommonTestingUtil htu; 057 private FileSystem fs; 058 private Path testDir; 059 private Path logDir; 060 061 @BeforeEach 062 public void setUp() throws IOException { 063 htu = new HBaseCommonTestingUtil(); 064 testDir = htu.getDataTestDir(); 065 fs = testDir.getFileSystem(htu.getConfiguration()); 066 assertTrue(testDir.depth() > 1); 067 068 logDir = new Path(testDir, "proc-logs"); 069 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 070 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); 071 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 072 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 073 } 074 075 @AfterEach 076 public void tearDown() throws IOException { 077 procExecutor.stop(); 078 procStore.stop(false); 079 fs.delete(logDir, true); 080 } 081 082 private static class TestProcedureException extends IOException { 083 084 private static final long serialVersionUID = 8798565784658913798L; 085 086 public TestProcedureException(String msg) { 087 super(msg); 088 } 089 } 090 091 public static class TestSequentialProcedure extends SequentialProcedure<Void> { 092 private final Procedure<Void>[] subProcs; 093 private final List<String> state; 094 private final Exception failure; 095 private final String name; 096 097 public TestSequentialProcedure() { 098 throw new UnsupportedOperationException("recovery should not be triggered here"); 099 } 100 101 public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) { 102 this.state = state; 103 this.subProcs = subProcs; 104 this.name = name; 105 this.failure = null; 106 } 107 108 public TestSequentialProcedure(String name, List<String> state, Exception failure) { 109 this.state = state; 110 this.subProcs = null; 111 this.name = name; 112 this.failure = failure; 113 } 114 115 @Override 116 protected Procedure<Void>[] execute(Void env) { 117 state.add(name + "-execute"); 118 if (failure != null) { 119 setFailure(new RemoteProcedureException(name + "-failure", failure)); 120 return null; 121 } 122 return subProcs; 123 } 124 125 @Override 126 protected void rollback(Void env) { 127 state.add(name + "-rollback"); 128 } 129 130 @Override 131 protected boolean abort(Void env) { 132 state.add(name + "-abort"); 133 return true; 134 } 135 } 136 137 @Test 138 public void testBadSubprocList() { 139 List<String> state = new ArrayList<>(); 140 Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state); 141 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC); 142 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 143 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 144 145 // subProc1 has a "null" subprocedure which is catched as InvalidArgument 146 // failed state with 2 execute and 2 rollback 147 LOG.info(Objects.toString(state)); 148 Procedure<?> result = procExecutor.getResult(rootId); 149 assertTrue(result.isFailed(), state.toString()); 150 ProcedureTestingUtility.assertIsIllegalArgumentException(result); 151 152 assertEquals(4, state.size(), state.toString()); 153 assertEquals("rootProc-execute", state.get(0)); 154 assertEquals("subProc1-execute", state.get(1)); 155 assertEquals("subProc1-rollback", state.get(2)); 156 assertEquals("rootProc-rollback", state.get(3)); 157 } 158 159 @Test 160 public void testSingleSequentialProc() { 161 List<String> state = new ArrayList<>(); 162 Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state); 163 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); 164 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 165 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 166 167 // successful state, with 3 execute 168 LOG.info(Objects.toString(state)); 169 Procedure<?> result = procExecutor.getResult(rootId); 170 ProcedureTestingUtility.assertProcNotFailed(result); 171 assertEquals(3, state.size(), state.toString()); 172 } 173 174 @Test 175 public void testSingleSequentialProcRollback() { 176 List<String> state = new ArrayList<>(); 177 Procedure<Void> subProc2 = 178 new TestSequentialProcedure("subProc2", state, new TestProcedureException("fail test")); 179 Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2); 180 Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1); 181 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc); 182 183 // the 3rd proc fail, rollback after 2 successful execution 184 LOG.info(Objects.toString(state)); 185 Procedure<?> result = procExecutor.getResult(rootId); 186 assertTrue(result.isFailed(), state.toString()); 187 LOG.info(result.getException().getMessage()); 188 Throwable cause = ProcedureTestingUtility.getExceptionCause(result); 189 assertTrue(cause instanceof TestProcedureException, 190 "expected TestProcedureException, got " + cause); 191 192 assertEquals(6, state.size(), state.toString()); 193 assertEquals("rootProc-execute", state.get(0)); 194 assertEquals("subProc1-execute", state.get(1)); 195 assertEquals("subProc2-execute", state.get(2)); 196 assertEquals("subProc2-rollback", state.get(3)); 197 assertEquals("subProc1-rollback", state.get(4)); 198 assertEquals("rootProc-rollback", state.get(5)); 199 } 200 201 public static class TestFaultyRollback extends SequentialProcedure<Void> { 202 private int retries = 0; 203 204 public TestFaultyRollback() { 205 } 206 207 @Override 208 protected Procedure<Void>[] execute(Void env) { 209 setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback")); 210 return null; 211 } 212 213 @Override 214 protected void rollback(Void env) throws IOException { 215 if (++retries < 3) { 216 LOG.info("inject rollback failure {}", retries); 217 throw new IOException("injected failure number " + retries); 218 } 219 LOG.info("execute non faulty rollback step retries={}", retries); 220 } 221 222 @Override 223 protected boolean abort(Void env) { 224 return false; 225 } 226 } 227 228 @Test 229 public void testRollbackRetriableFailure() { 230 long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback()); 231 232 Procedure<?> result = procExecutor.getResult(procId); 233 assertTrue(result.isFailed(), "expected a failure"); 234 LOG.info(result.getException().getMessage()); 235 Throwable cause = ProcedureTestingUtility.getExceptionCause(result); 236 assertInstanceOf(TestProcedureException.class, cause, 237 "expected TestProcedureException, got " + cause); 238 } 239 240 public static class TestWaitingProcedure extends SequentialProcedure<Void> { 241 private final List<String> state; 242 private final boolean hasChild; 243 private final String name; 244 245 public TestWaitingProcedure() { 246 throw new UnsupportedOperationException("recovery should not be triggered here"); 247 } 248 249 public TestWaitingProcedure(String name, List<String> state, boolean hasChild) { 250 this.hasChild = hasChild; 251 this.state = state; 252 this.name = name; 253 } 254 255 @Override 256 protected Procedure<Void>[] execute(Void env) { 257 state.add(name + "-execute"); 258 setState(ProcedureState.WAITING_TIMEOUT); 259 return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null; 260 } 261 262 @Override 263 protected void rollback(Void env) { 264 state.add(name + "-rollback"); 265 } 266 267 @Override 268 protected boolean abort(Void env) { 269 state.add(name + "-abort"); 270 return true; 271 } 272 273 public static class TestWaitChild extends SequentialProcedure<Void> { 274 private final List<String> state; 275 private final String name; 276 277 public TestWaitChild() { 278 throw new UnsupportedOperationException("recovery should not be triggered here"); 279 } 280 281 public TestWaitChild(String name, List<String> state) { 282 this.name = name; 283 this.state = state; 284 } 285 286 @Override 287 protected Procedure<Void>[] execute(Void env) { 288 state.add(name + "-child-execute"); 289 return null; 290 } 291 292 @Override 293 protected void rollback(Void env) { 294 throw new UnsupportedOperationException("should not rollback a successful child procedure"); 295 } 296 297 @Override 298 protected boolean abort(Void env) { 299 state.add(name + "-child-abort"); 300 return true; 301 } 302 } 303 } 304 305 @Test 306 public void testAbortTimeout() { 307 final int PROC_TIMEOUT_MSEC = 2500; 308 List<String> state = new ArrayList<>(); 309 Procedure<Void> proc = new TestWaitingProcedure("wproc", state, false); 310 proc.setTimeout(PROC_TIMEOUT_MSEC); 311 long startTime = EnvironmentEdgeManager.currentTime(); 312 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 313 long execTime = EnvironmentEdgeManager.currentTime() - startTime; 314 LOG.info(Objects.toString(state)); 315 assertTrue(execTime >= PROC_TIMEOUT_MSEC, "we didn't wait enough execTime=" + execTime); 316 Procedure<?> result = procExecutor.getResult(rootId); 317 assertTrue(result.isFailed(), state.toString()); 318 ProcedureTestingUtility.assertIsTimeoutException(result); 319 assertEquals(2, state.size(), state.toString()); 320 assertEquals("wproc-execute", state.get(0)); 321 assertEquals("wproc-rollback", state.get(1)); 322 } 323 324 @Test 325 public void testAbortTimeoutWithChildren() { 326 List<String> state = new ArrayList<>(); 327 Procedure<Void> proc = new TestWaitingProcedure("wproc", state, true); 328 proc.setTimeout(2500); 329 long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc); 330 LOG.info(Objects.toString(state)); 331 Procedure<?> result = procExecutor.getResult(rootId); 332 assertTrue(result.isFailed(), state.toString()); 333 ProcedureTestingUtility.assertIsTimeoutException(result); 334 assertEquals(3, state.size(), state.toString()); 335 assertEquals("wproc-execute", state.get(0)); 336 assertEquals("wproc-child-execute", state.get(1)); 337 assertEquals("wproc-rollback", state.get(2)); 338 } 339}