001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Objects;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
044
045@Category({MasterTests.class, SmallTests.class})
046public class TestProcedureExecution {
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049      HBaseClassTestRule.forClass(TestProcedureExecution.class);
050
051  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecution.class);
052
053  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
054  private static final Procedure<?> NULL_PROC = null;
055
056  private ProcedureExecutor<Void> procExecutor;
057  private ProcedureStore procStore;
058
059  private HBaseCommonTestingUtility htu;
060  private FileSystem fs;
061  private Path testDir;
062  private Path logDir;
063
064  @Before
065  public void setUp() throws IOException {
066    htu = new HBaseCommonTestingUtility();
067    testDir = htu.getDataTestDir();
068    fs = testDir.getFileSystem(htu.getConfiguration());
069    assertTrue(testDir.depth() > 1);
070
071    logDir = new Path(testDir, "proc-logs");
072    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
073    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
074    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
075    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
076  }
077
078  @After
079  public void tearDown() throws IOException {
080    procExecutor.stop();
081    procStore.stop(false);
082    fs.delete(logDir, true);
083  }
084
085  private static class TestProcedureException extends IOException {
086
087    private static final long serialVersionUID = 8798565784658913798L;
088
089    public TestProcedureException(String msg) {
090      super(msg);
091    }
092  }
093
094  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
095    private final Procedure<Void>[] subProcs;
096    private final List<String> state;
097    private final Exception failure;
098    private final String name;
099
100    public TestSequentialProcedure() {
101      throw new UnsupportedOperationException("recovery should not be triggered here");
102    }
103
104    public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) {
105      this.state = state;
106      this.subProcs = subProcs;
107      this.name = name;
108      this.failure = null;
109    }
110
111    public TestSequentialProcedure(String name, List<String> state, Exception failure) {
112      this.state = state;
113      this.subProcs = null;
114      this.name = name;
115      this.failure = failure;
116    }
117
118    @Override
119    protected Procedure<Void>[] execute(Void env) {
120      state.add(name + "-execute");
121      if (failure != null) {
122        setFailure(new RemoteProcedureException(name + "-failure", failure));
123        return null;
124      }
125      return subProcs;
126    }
127
128    @Override
129    protected void rollback(Void env) {
130      state.add(name + "-rollback");
131    }
132
133    @Override
134    protected boolean abort(Void env) {
135      state.add(name + "-abort");
136      return true;
137    }
138  }
139
140  @Test
141  public void testBadSubprocList() {
142    List<String> state = new ArrayList<>();
143    Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state);
144    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC);
145    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
146    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
147
148    // subProc1 has a "null" subprocedure which is catched as InvalidArgument
149    // failed state with 2 execute and 2 rollback
150    LOG.info(Objects.toString(state));
151    Procedure<?> result = procExecutor.getResult(rootId);
152    assertTrue(state.toString(), result.isFailed());
153    ProcedureTestingUtility.assertIsIllegalArgumentException(result);
154
155    assertEquals(state.toString(), 4, state.size());
156    assertEquals("rootProc-execute", state.get(0));
157    assertEquals("subProc1-execute", state.get(1));
158    assertEquals("subProc1-rollback", state.get(2));
159    assertEquals("rootProc-rollback", state.get(3));
160  }
161
162  @Test
163  public void testSingleSequentialProc() {
164    List<String> state = new ArrayList<>();
165    Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state);
166    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
167    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
168    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
169
170    // successful state, with 3 execute
171    LOG.info(Objects.toString(state));
172    Procedure<?> result = procExecutor.getResult(rootId);
173    ProcedureTestingUtility.assertProcNotFailed(result);
174    assertEquals(state.toString(), 3, state.size());
175  }
176
177  @Test
178  public void testSingleSequentialProcRollback() {
179    List<String> state = new ArrayList<>();
180    Procedure<Void> subProc2 =
181      new TestSequentialProcedure("subProc2", state, new TestProcedureException("fail test"));
182    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
183    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
184    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
185
186    // the 3rd proc fail, rollback after 2 successful execution
187    LOG.info(Objects.toString(state));
188    Procedure<?> result = procExecutor.getResult(rootId);
189    assertTrue(state.toString(), result.isFailed());
190    LOG.info(result.getException().getMessage());
191    Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
192    assertTrue("expected TestProcedureException, got " + cause,
193      cause instanceof TestProcedureException);
194
195    assertEquals(state.toString(), 6, state.size());
196    assertEquals("rootProc-execute", state.get(0));
197    assertEquals("subProc1-execute", state.get(1));
198    assertEquals("subProc2-execute", state.get(2));
199    assertEquals("subProc2-rollback", state.get(3));
200    assertEquals("subProc1-rollback", state.get(4));
201    assertEquals("rootProc-rollback", state.get(5));
202  }
203
204  public static class TestFaultyRollback extends SequentialProcedure<Void> {
205    private int retries = 0;
206
207    public TestFaultyRollback() { }
208
209    @Override
210    protected Procedure<Void>[] execute(Void env) {
211      setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback"));
212      return null;
213    }
214
215    @Override
216    protected void rollback(Void env) throws IOException {
217      if (++retries < 3) {
218        LOG.info("inject rollback failure " + retries);
219        throw new IOException("injected failure number " + retries);
220      }
221      LOG.info("execute non faulty rollback step retries=" + retries);
222    }
223
224    @Override
225    protected boolean abort(Void env) {
226      return false;
227    }
228  }
229
230  @Test
231  public void testRollbackRetriableFailure() {
232    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback());
233
234    Procedure<?> result = procExecutor.getResult(procId);
235    assertTrue("expected a failure", result.isFailed());
236    LOG.info(result.getException().getMessage());
237    Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
238    assertTrue("expected TestProcedureException, got " + cause,
239      cause instanceof TestProcedureException);
240  }
241
242  public static class TestWaitingProcedure extends SequentialProcedure<Void> {
243    private final List<String> state;
244    private final boolean hasChild;
245    private final String name;
246
247    public TestWaitingProcedure() {
248      throw new UnsupportedOperationException("recovery should not be triggered here");
249    }
250
251    public TestWaitingProcedure(String name, List<String> state, boolean hasChild) {
252      this.hasChild = hasChild;
253      this.state = state;
254      this.name = name;
255    }
256
257    @Override
258    protected Procedure<Void>[] execute(Void env) {
259      state.add(name + "-execute");
260      setState(ProcedureState.WAITING_TIMEOUT);
261      return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null;
262    }
263
264    @Override
265    protected void rollback(Void env) {
266      state.add(name + "-rollback");
267    }
268
269    @Override
270    protected boolean abort(Void env) {
271      state.add(name + "-abort");
272      return true;
273    }
274
275    public static class TestWaitChild extends SequentialProcedure<Void> {
276      private final List<String> state;
277      private final String name;
278
279      public TestWaitChild() {
280        throw new UnsupportedOperationException("recovery should not be triggered here");
281      }
282
283      public TestWaitChild(String name, List<String> state) {
284        this.name = name;
285        this.state = state;
286      }
287
288      @Override
289      protected Procedure<Void>[] execute(Void env) {
290        state.add(name + "-child-execute");
291        return null;
292      }
293
294      @Override
295      protected void rollback(Void env) {
296        throw new UnsupportedOperationException("should not rollback a successful child procedure");
297      }
298
299      @Override
300      protected boolean abort(Void env) {
301        state.add(name + "-child-abort");
302        return true;
303      }
304    }
305  }
306
307  @Test
308  public void testAbortTimeout() {
309    final int PROC_TIMEOUT_MSEC = 2500;
310    List<String> state = new ArrayList<>();
311    Procedure<Void> proc = new TestWaitingProcedure("wproc", state, false);
312    proc.setTimeout(PROC_TIMEOUT_MSEC);
313    long startTime = EnvironmentEdgeManager.currentTime();
314    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
315    long execTime = EnvironmentEdgeManager.currentTime() - startTime;
316    LOG.info(Objects.toString(state));
317    assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC);
318    Procedure<?> result = procExecutor.getResult(rootId);
319    assertTrue(state.toString(), result.isFailed());
320    ProcedureTestingUtility.assertIsTimeoutException(result);
321    assertEquals(state.toString(), 2, state.size());
322    assertEquals("wproc-execute", state.get(0));
323    assertEquals("wproc-rollback", state.get(1));
324  }
325
326  @Test
327  public void testAbortTimeoutWithChildren() {
328    List<String> state = new ArrayList<>();
329    Procedure<Void> proc = new TestWaitingProcedure("wproc", state, true);
330    proc.setTimeout(2500);
331    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
332    LOG.info(Objects.toString(state));
333    Procedure<?> result = procExecutor.getResult(rootId);
334    assertTrue(state.toString(), result.isFailed());
335    ProcedureTestingUtility.assertIsTimeoutException(result);
336    assertEquals(state.toString(), 3, state.size());
337    assertEquals("wproc-execute", state.get(0));
338    assertEquals("wproc-child-execute", state.get(1));
339    assertEquals("wproc-rollback", state.get(2));
340  }
341}