001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Objects;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
044
045@Category({ MasterTests.class, SmallTests.class })
046public class TestProcedureExecution {
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestProcedureExecution.class);
050
051  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecution.class);
052
053  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
054  private static final Procedure<?> NULL_PROC = null;
055
056  private ProcedureExecutor<Void> procExecutor;
057  private ProcedureStore procStore;
058
059  private HBaseCommonTestingUtil htu;
060  private FileSystem fs;
061  private Path testDir;
062  private Path logDir;
063
064  @Before
065  public void setUp() throws IOException {
066    htu = new HBaseCommonTestingUtil();
067    testDir = htu.getDataTestDir();
068    fs = testDir.getFileSystem(htu.getConfiguration());
069    assertTrue(testDir.depth() > 1);
070
071    logDir = new Path(testDir, "proc-logs");
072    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
073    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
074    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
075    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
076  }
077
078  @After
079  public void tearDown() throws IOException {
080    procExecutor.stop();
081    procStore.stop(false);
082    fs.delete(logDir, true);
083  }
084
085  private static class TestProcedureException extends IOException {
086
087    private static final long serialVersionUID = 8798565784658913798L;
088
089    public TestProcedureException(String msg) {
090      super(msg);
091    }
092  }
093
094  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
095    private final Procedure<Void>[] subProcs;
096    private final List<String> state;
097    private final Exception failure;
098    private final String name;
099
100    public TestSequentialProcedure() {
101      throw new UnsupportedOperationException("recovery should not be triggered here");
102    }
103
104    public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) {
105      this.state = state;
106      this.subProcs = subProcs;
107      this.name = name;
108      this.failure = null;
109    }
110
111    public TestSequentialProcedure(String name, List<String> state, Exception failure) {
112      this.state = state;
113      this.subProcs = null;
114      this.name = name;
115      this.failure = failure;
116    }
117
118    @Override
119    protected Procedure<Void>[] execute(Void env) {
120      state.add(name + "-execute");
121      if (failure != null) {
122        setFailure(new RemoteProcedureException(name + "-failure", failure));
123        return null;
124      }
125      return subProcs;
126    }
127
128    @Override
129    protected void rollback(Void env) {
130      state.add(name + "-rollback");
131    }
132
133    @Override
134    protected boolean abort(Void env) {
135      state.add(name + "-abort");
136      return true;
137    }
138  }
139
140  @Test
141  public void testBadSubprocList() {
142    List<String> state = new ArrayList<>();
143    Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state);
144    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC);
145    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
146    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
147
148    // subProc1 has a "null" subprocedure which is catched as InvalidArgument
149    // failed state with 2 execute and 2 rollback
150    LOG.info(Objects.toString(state));
151    Procedure<?> result = procExecutor.getResult(rootId);
152    assertTrue(state.toString(), result.isFailed());
153    ProcedureTestingUtility.assertIsIllegalArgumentException(result);
154
155    assertEquals(state.toString(), 4, state.size());
156    assertEquals("rootProc-execute", state.get(0));
157    assertEquals("subProc1-execute", state.get(1));
158    assertEquals("subProc1-rollback", state.get(2));
159    assertEquals("rootProc-rollback", state.get(3));
160  }
161
162  @Test
163  public void testSingleSequentialProc() {
164    List<String> state = new ArrayList<>();
165    Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state);
166    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
167    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
168    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
169
170    // successful state, with 3 execute
171    LOG.info(Objects.toString(state));
172    Procedure<?> result = procExecutor.getResult(rootId);
173    ProcedureTestingUtility.assertProcNotFailed(result);
174    assertEquals(state.toString(), 3, state.size());
175  }
176
177  @Test
178  public void testSingleSequentialProcRollback() {
179    List<String> state = new ArrayList<>();
180    Procedure<Void> subProc2 =
181      new TestSequentialProcedure("subProc2", state, new TestProcedureException("fail test"));
182    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
183    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
184    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
185
186    // the 3rd proc fail, rollback after 2 successful execution
187    LOG.info(Objects.toString(state));
188    Procedure<?> result = procExecutor.getResult(rootId);
189    assertTrue(state.toString(), result.isFailed());
190    LOG.info(result.getException().getMessage());
191    Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
192    assertTrue("expected TestProcedureException, got " + cause,
193      cause instanceof TestProcedureException);
194
195    assertEquals(state.toString(), 6, state.size());
196    assertEquals("rootProc-execute", state.get(0));
197    assertEquals("subProc1-execute", state.get(1));
198    assertEquals("subProc2-execute", state.get(2));
199    assertEquals("subProc2-rollback", state.get(3));
200    assertEquals("subProc1-rollback", state.get(4));
201    assertEquals("rootProc-rollback", state.get(5));
202  }
203
204  public static class TestFaultyRollback extends SequentialProcedure<Void> {
205    private int retries = 0;
206
207    public TestFaultyRollback() {
208    }
209
210    @Override
211    protected Procedure<Void>[] execute(Void env) {
212      setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback"));
213      return null;
214    }
215
216    @Override
217    protected void rollback(Void env) throws IOException {
218      if (++retries < 3) {
219        LOG.info("inject rollback failure " + retries);
220        throw new IOException("injected failure number " + retries);
221      }
222      LOG.info("execute non faulty rollback step retries=" + retries);
223    }
224
225    @Override
226    protected boolean abort(Void env) {
227      return false;
228    }
229  }
230
231  @Test
232  public void testRollbackRetriableFailure() {
233    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback());
234
235    Procedure<?> result = procExecutor.getResult(procId);
236    assertTrue("expected a failure", result.isFailed());
237    LOG.info(result.getException().getMessage());
238    Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
239    assertTrue("expected TestProcedureException, got " + cause,
240      cause instanceof TestProcedureException);
241  }
242
243  public static class TestWaitingProcedure extends SequentialProcedure<Void> {
244    private final List<String> state;
245    private final boolean hasChild;
246    private final String name;
247
248    public TestWaitingProcedure() {
249      throw new UnsupportedOperationException("recovery should not be triggered here");
250    }
251
252    public TestWaitingProcedure(String name, List<String> state, boolean hasChild) {
253      this.hasChild = hasChild;
254      this.state = state;
255      this.name = name;
256    }
257
258    @Override
259    protected Procedure<Void>[] execute(Void env) {
260      state.add(name + "-execute");
261      setState(ProcedureState.WAITING_TIMEOUT);
262      return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null;
263    }
264
265    @Override
266    protected void rollback(Void env) {
267      state.add(name + "-rollback");
268    }
269
270    @Override
271    protected boolean abort(Void env) {
272      state.add(name + "-abort");
273      return true;
274    }
275
276    public static class TestWaitChild extends SequentialProcedure<Void> {
277      private final List<String> state;
278      private final String name;
279
280      public TestWaitChild() {
281        throw new UnsupportedOperationException("recovery should not be triggered here");
282      }
283
284      public TestWaitChild(String name, List<String> state) {
285        this.name = name;
286        this.state = state;
287      }
288
289      @Override
290      protected Procedure<Void>[] execute(Void env) {
291        state.add(name + "-child-execute");
292        return null;
293      }
294
295      @Override
296      protected void rollback(Void env) {
297        throw new UnsupportedOperationException("should not rollback a successful child procedure");
298      }
299
300      @Override
301      protected boolean abort(Void env) {
302        state.add(name + "-child-abort");
303        return true;
304      }
305    }
306  }
307
308  @Test
309  public void testAbortTimeout() {
310    final int PROC_TIMEOUT_MSEC = 2500;
311    List<String> state = new ArrayList<>();
312    Procedure<Void> proc = new TestWaitingProcedure("wproc", state, false);
313    proc.setTimeout(PROC_TIMEOUT_MSEC);
314    long startTime = EnvironmentEdgeManager.currentTime();
315    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
316    long execTime = EnvironmentEdgeManager.currentTime() - startTime;
317    LOG.info(Objects.toString(state));
318    assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC);
319    Procedure<?> result = procExecutor.getResult(rootId);
320    assertTrue(state.toString(), result.isFailed());
321    ProcedureTestingUtility.assertIsTimeoutException(result);
322    assertEquals(state.toString(), 2, state.size());
323    assertEquals("wproc-execute", state.get(0));
324    assertEquals("wproc-rollback", state.get(1));
325  }
326
327  @Test
328  public void testAbortTimeoutWithChildren() {
329    List<String> state = new ArrayList<>();
330    Procedure<Void> proc = new TestWaitingProcedure("wproc", state, true);
331    proc.setTimeout(2500);
332    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
333    LOG.info(Objects.toString(state));
334    Procedure<?> result = procExecutor.getResult(rootId);
335    assertTrue(state.toString(), result.isFailed());
336    ProcedureTestingUtility.assertIsTimeoutException(result);
337    assertEquals(state.toString(), 3, state.size());
338    assertEquals("wproc-execute", state.get(0));
339    assertEquals("wproc-child-execute", state.get(1));
340    assertEquals("wproc-rollback", state.get(2));
341  }
342}