001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertInstanceOf;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.Objects;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.junit.jupiter.api.AfterEach;
036import org.junit.jupiter.api.BeforeEach;
037import org.junit.jupiter.api.Tag;
038import org.junit.jupiter.api.Test;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
043
044@Tag(MasterTests.TAG)
045@Tag(SmallTests.TAG)
046public class TestProcedureExecution {
047
048  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecution.class);
049
050  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
051  private static final Procedure<?> NULL_PROC = null;
052
053  private ProcedureExecutor<Void> procExecutor;
054  private ProcedureStore procStore;
055
056  private HBaseCommonTestingUtil htu;
057  private FileSystem fs;
058  private Path testDir;
059  private Path logDir;
060
061  @BeforeEach
062  public void setUp() throws IOException {
063    htu = new HBaseCommonTestingUtil();
064    testDir = htu.getDataTestDir();
065    fs = testDir.getFileSystem(htu.getConfiguration());
066    assertTrue(testDir.depth() > 1);
067
068    logDir = new Path(testDir, "proc-logs");
069    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
070    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
071    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
072    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
073  }
074
075  @AfterEach
076  public void tearDown() throws IOException {
077    procExecutor.stop();
078    procStore.stop(false);
079    fs.delete(logDir, true);
080  }
081
082  private static class TestProcedureException extends IOException {
083
084    private static final long serialVersionUID = 8798565784658913798L;
085
086    public TestProcedureException(String msg) {
087      super(msg);
088    }
089  }
090
091  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
092    private final Procedure<Void>[] subProcs;
093    private final List<String> state;
094    private final Exception failure;
095    private final String name;
096
097    public TestSequentialProcedure() {
098      throw new UnsupportedOperationException("recovery should not be triggered here");
099    }
100
101    public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) {
102      this.state = state;
103      this.subProcs = subProcs;
104      this.name = name;
105      this.failure = null;
106    }
107
108    public TestSequentialProcedure(String name, List<String> state, Exception failure) {
109      this.state = state;
110      this.subProcs = null;
111      this.name = name;
112      this.failure = failure;
113    }
114
115    @Override
116    protected Procedure<Void>[] execute(Void env) {
117      state.add(name + "-execute");
118      if (failure != null) {
119        setFailure(new RemoteProcedureException(name + "-failure", failure));
120        return null;
121      }
122      return subProcs;
123    }
124
125    @Override
126    protected void rollback(Void env) {
127      state.add(name + "-rollback");
128    }
129
130    @Override
131    protected boolean abort(Void env) {
132      state.add(name + "-abort");
133      return true;
134    }
135  }
136
137  @Test
138  public void testBadSubprocList() {
139    List<String> state = new ArrayList<>();
140    Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state);
141    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC);
142    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
143    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
144
145    // subProc1 has a "null" subprocedure which is catched as InvalidArgument
146    // failed state with 2 execute and 2 rollback
147    LOG.info(Objects.toString(state));
148    Procedure<?> result = procExecutor.getResult(rootId);
149    assertTrue(result.isFailed(), state.toString());
150    ProcedureTestingUtility.assertIsIllegalArgumentException(result);
151
152    assertEquals(4, state.size(), state.toString());
153    assertEquals("rootProc-execute", state.get(0));
154    assertEquals("subProc1-execute", state.get(1));
155    assertEquals("subProc1-rollback", state.get(2));
156    assertEquals("rootProc-rollback", state.get(3));
157  }
158
159  @Test
160  public void testSingleSequentialProc() {
161    List<String> state = new ArrayList<>();
162    Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state);
163    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
164    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
165    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
166
167    // successful state, with 3 execute
168    LOG.info(Objects.toString(state));
169    Procedure<?> result = procExecutor.getResult(rootId);
170    ProcedureTestingUtility.assertProcNotFailed(result);
171    assertEquals(3, state.size(), state.toString());
172  }
173
174  @Test
175  public void testSingleSequentialProcRollback() {
176    List<String> state = new ArrayList<>();
177    Procedure<Void> subProc2 =
178      new TestSequentialProcedure("subProc2", state, new TestProcedureException("fail test"));
179    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
180    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
181    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
182
183    // the 3rd proc fail, rollback after 2 successful execution
184    LOG.info(Objects.toString(state));
185    Procedure<?> result = procExecutor.getResult(rootId);
186    assertTrue(result.isFailed(), state.toString());
187    LOG.info(result.getException().getMessage());
188    Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
189    assertTrue(cause instanceof TestProcedureException,
190      "expected TestProcedureException, got " + cause);
191
192    assertEquals(6, state.size(), state.toString());
193    assertEquals("rootProc-execute", state.get(0));
194    assertEquals("subProc1-execute", state.get(1));
195    assertEquals("subProc2-execute", state.get(2));
196    assertEquals("subProc2-rollback", state.get(3));
197    assertEquals("subProc1-rollback", state.get(4));
198    assertEquals("rootProc-rollback", state.get(5));
199  }
200
201  public static class TestFaultyRollback extends SequentialProcedure<Void> {
202    private int retries = 0;
203
204    public TestFaultyRollback() {
205    }
206
207    @Override
208    protected Procedure<Void>[] execute(Void env) {
209      setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback"));
210      return null;
211    }
212
213    @Override
214    protected void rollback(Void env) throws IOException {
215      if (++retries < 3) {
216        LOG.info("inject rollback failure {}", retries);
217        throw new IOException("injected failure number " + retries);
218      }
219      LOG.info("execute non faulty rollback step retries={}", retries);
220    }
221
222    @Override
223    protected boolean abort(Void env) {
224      return false;
225    }
226  }
227
228  @Test
229  public void testRollbackRetriableFailure() {
230    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback());
231
232    Procedure<?> result = procExecutor.getResult(procId);
233    assertTrue(result.isFailed(), "expected a failure");
234    LOG.info(result.getException().getMessage());
235    Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
236    assertInstanceOf(TestProcedureException.class, cause,
237      "expected TestProcedureException, got " + cause);
238  }
239
240  public static class TestWaitingProcedure extends SequentialProcedure<Void> {
241    private final List<String> state;
242    private final boolean hasChild;
243    private final String name;
244
245    public TestWaitingProcedure() {
246      throw new UnsupportedOperationException("recovery should not be triggered here");
247    }
248
249    public TestWaitingProcedure(String name, List<String> state, boolean hasChild) {
250      this.hasChild = hasChild;
251      this.state = state;
252      this.name = name;
253    }
254
255    @Override
256    protected Procedure<Void>[] execute(Void env) {
257      state.add(name + "-execute");
258      setState(ProcedureState.WAITING_TIMEOUT);
259      return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null;
260    }
261
262    @Override
263    protected void rollback(Void env) {
264      state.add(name + "-rollback");
265    }
266
267    @Override
268    protected boolean abort(Void env) {
269      state.add(name + "-abort");
270      return true;
271    }
272
273    public static class TestWaitChild extends SequentialProcedure<Void> {
274      private final List<String> state;
275      private final String name;
276
277      public TestWaitChild() {
278        throw new UnsupportedOperationException("recovery should not be triggered here");
279      }
280
281      public TestWaitChild(String name, List<String> state) {
282        this.name = name;
283        this.state = state;
284      }
285
286      @Override
287      protected Procedure<Void>[] execute(Void env) {
288        state.add(name + "-child-execute");
289        return null;
290      }
291
292      @Override
293      protected void rollback(Void env) {
294        throw new UnsupportedOperationException("should not rollback a successful child procedure");
295      }
296
297      @Override
298      protected boolean abort(Void env) {
299        state.add(name + "-child-abort");
300        return true;
301      }
302    }
303  }
304
305  @Test
306  public void testAbortTimeout() {
307    final int PROC_TIMEOUT_MSEC = 2500;
308    List<String> state = new ArrayList<>();
309    Procedure<Void> proc = new TestWaitingProcedure("wproc", state, false);
310    proc.setTimeout(PROC_TIMEOUT_MSEC);
311    long startTime = EnvironmentEdgeManager.currentTime();
312    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
313    long execTime = EnvironmentEdgeManager.currentTime() - startTime;
314    LOG.info(Objects.toString(state));
315    assertTrue(execTime >= PROC_TIMEOUT_MSEC, "we didn't wait enough execTime=" + execTime);
316    Procedure<?> result = procExecutor.getResult(rootId);
317    assertTrue(result.isFailed(), state.toString());
318    ProcedureTestingUtility.assertIsTimeoutException(result);
319    assertEquals(2, state.size(), state.toString());
320    assertEquals("wproc-execute", state.get(0));
321    assertEquals("wproc-rollback", state.get(1));
322  }
323
324  @Test
325  public void testAbortTimeoutWithChildren() {
326    List<String> state = new ArrayList<>();
327    Procedure<Void> proc = new TestWaitingProcedure("wproc", state, true);
328    proc.setTimeout(2500);
329    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
330    LOG.info(Objects.toString(state));
331    Procedure<?> result = procExecutor.getResult(rootId);
332    assertTrue(result.isFailed(), state.toString());
333    ProcedureTestingUtility.assertIsTimeoutException(result);
334    assertEquals(3, state.size(), state.toString());
335    assertEquals("wproc-execute", state.get(0));
336    assertEquals("wproc-child-execute", state.get(1));
337    assertEquals("wproc-rollback", state.get(2));
338  }
339}