001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Objects;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
031import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
032import org.apache.hadoop.hbase.testclassification.MasterTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.junit.After;
036import org.junit.Before;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
044
045@Category({MasterTests.class, SmallTests.class})
046public class TestProcedureExecution {
047
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestProcedureExecution.class);
051
052  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecution.class);
053
054  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
055  private static final Procedure<?> NULL_PROC = null;
056
057  private ProcedureExecutor<Void> procExecutor;
058  private ProcedureStore procStore;
059
060  private HBaseCommonTestingUtility htu;
061  private FileSystem fs;
062  private Path testDir;
063  private Path logDir;
064
065  @Before
066  public void setUp() throws IOException {
067    htu = new HBaseCommonTestingUtility();
068    testDir = htu.getDataTestDir();
069    fs = testDir.getFileSystem(htu.getConfiguration());
070    assertTrue(testDir.depth() > 1);
071
072    logDir = new Path(testDir, "proc-logs");
073    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
074    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore);
075    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
076    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
077  }
078
079  @After
080  public void tearDown() throws IOException {
081    procExecutor.stop();
082    procStore.stop(false);
083    fs.delete(logDir, true);
084  }
085
086  private static class TestProcedureException extends IOException {
087
088    private static final long serialVersionUID = 8798565784658913798L;
089
090    public TestProcedureException(String msg) {
091      super(msg);
092    }
093  }
094
095  public static class TestSequentialProcedure extends SequentialProcedure<Void> {
096    private final Procedure<Void>[] subProcs;
097    private final List<String> state;
098    private final Exception failure;
099    private final String name;
100
101    public TestSequentialProcedure() {
102      throw new UnsupportedOperationException("recovery should not be triggered here");
103    }
104
105    public TestSequentialProcedure(String name, List<String> state, Procedure... subProcs) {
106      this.state = state;
107      this.subProcs = subProcs;
108      this.name = name;
109      this.failure = null;
110    }
111
112    public TestSequentialProcedure(String name, List<String> state, Exception failure) {
113      this.state = state;
114      this.subProcs = null;
115      this.name = name;
116      this.failure = failure;
117    }
118
119    @Override
120    protected Procedure<Void>[] execute(Void env) {
121      state.add(name + "-execute");
122      if (failure != null) {
123        setFailure(new RemoteProcedureException(name + "-failure", failure));
124        return null;
125      }
126      return subProcs;
127    }
128
129    @Override
130    protected void rollback(Void env) {
131      state.add(name + "-rollback");
132    }
133
134    @Override
135    protected boolean abort(Void env) {
136      state.add(name + "-abort");
137      return true;
138    }
139  }
140
141  @Test
142  public void testBadSubprocList() {
143    List<String> state = new ArrayList<>();
144    Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state);
145    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2, NULL_PROC);
146    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
147    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
148
149    // subProc1 has a "null" subprocedure which is catched as InvalidArgument
150    // failed state with 2 execute and 2 rollback
151    LOG.info(Objects.toString(state));
152    Procedure<?> result = procExecutor.getResult(rootId);
153    assertTrue(state.toString(), result.isFailed());
154    ProcedureTestingUtility.assertIsIllegalArgumentException(result);
155
156    assertEquals(state.toString(), 4, state.size());
157    assertEquals("rootProc-execute", state.get(0));
158    assertEquals("subProc1-execute", state.get(1));
159    assertEquals("subProc1-rollback", state.get(2));
160    assertEquals("rootProc-rollback", state.get(3));
161  }
162
163  @Test
164  public void testSingleSequentialProc() {
165    List<String> state = new ArrayList<>();
166    Procedure<Void> subProc2 = new TestSequentialProcedure("subProc2", state);
167    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
168    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
169    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
170
171    // successful state, with 3 execute
172    LOG.info(Objects.toString(state));
173    Procedure<?> result = procExecutor.getResult(rootId);
174    ProcedureTestingUtility.assertProcNotFailed(result);
175    assertEquals(state.toString(), 3, state.size());
176  }
177
178  @Test
179  public void testSingleSequentialProcRollback() {
180    List<String> state = new ArrayList<>();
181    Procedure<Void> subProc2 =
182      new TestSequentialProcedure("subProc2", state, new TestProcedureException("fail test"));
183    Procedure<Void> subProc1 = new TestSequentialProcedure("subProc1", state, subProc2);
184    Procedure<Void> rootProc = new TestSequentialProcedure("rootProc", state, subProc1);
185    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, rootProc);
186
187    // the 3rd proc fail, rollback after 2 successful execution
188    LOG.info(Objects.toString(state));
189    Procedure<?> result = procExecutor.getResult(rootId);
190    assertTrue(state.toString(), result.isFailed());
191    LOG.info(result.getException().getMessage());
192    Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
193    assertTrue("expected TestProcedureException, got " + cause,
194      cause instanceof TestProcedureException);
195
196    assertEquals(state.toString(), 6, state.size());
197    assertEquals("rootProc-execute", state.get(0));
198    assertEquals("subProc1-execute", state.get(1));
199    assertEquals("subProc2-execute", state.get(2));
200    assertEquals("subProc2-rollback", state.get(3));
201    assertEquals("subProc1-rollback", state.get(4));
202    assertEquals("rootProc-rollback", state.get(5));
203  }
204
205  public static class TestFaultyRollback extends SequentialProcedure<Void> {
206    private int retries = 0;
207
208    public TestFaultyRollback() { }
209
210    @Override
211    protected Procedure<Void>[] execute(Void env) {
212      setFailure("faulty-rollback-test", new TestProcedureException("test faulty rollback"));
213      return null;
214    }
215
216    @Override
217    protected void rollback(Void env) throws IOException {
218      if (++retries < 3) {
219        LOG.info("inject rollback failure " + retries);
220        throw new IOException("injected failure number " + retries);
221      }
222      LOG.info("execute non faulty rollback step retries=" + retries);
223    }
224
225    @Override
226    protected boolean abort(Void env) { return false; }
227  }
228
229  @Test
230  public void testRollbackRetriableFailure() {
231    long procId = ProcedureTestingUtility.submitAndWait(procExecutor, new TestFaultyRollback());
232
233    Procedure<?> result = procExecutor.getResult(procId);
234    assertTrue("expected a failure", result.isFailed());
235    LOG.info(result.getException().getMessage());
236    Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
237    assertTrue("expected TestProcedureException, got " + cause,
238      cause instanceof TestProcedureException);
239  }
240
241  public static class TestWaitingProcedure extends SequentialProcedure<Void> {
242    private final List<String> state;
243    private final boolean hasChild;
244    private final String name;
245
246    public TestWaitingProcedure() {
247      throw new UnsupportedOperationException("recovery should not be triggered here");
248    }
249
250    public TestWaitingProcedure(String name, List<String> state, boolean hasChild) {
251      this.hasChild = hasChild;
252      this.state = state;
253      this.name = name;
254    }
255
256    @Override
257    protected Procedure<Void>[] execute(Void env) {
258      state.add(name + "-execute");
259      setState(ProcedureState.WAITING_TIMEOUT);
260      return hasChild ? new Procedure[] { new TestWaitChild(name, state) } : null;
261    }
262
263    @Override
264    protected void rollback(Void env) {
265      state.add(name + "-rollback");
266    }
267
268    @Override
269    protected boolean abort(Void env) {
270      state.add(name + "-abort");
271      return true;
272    }
273
274    public static class TestWaitChild extends SequentialProcedure<Void> {
275      private final List<String> state;
276      private final String name;
277
278      public TestWaitChild() {
279        throw new UnsupportedOperationException("recovery should not be triggered here");
280      }
281
282      public TestWaitChild(String name, List<String> state) {
283        this.name = name;
284        this.state = state;
285      }
286
287      @Override
288      protected Procedure<Void>[] execute(Void env) {
289        state.add(name + "-child-execute");
290        return null;
291      }
292
293      @Override
294      protected void rollback(Void env) {
295        throw new UnsupportedOperationException("should not rollback a successful child procedure");
296      }
297
298      @Override
299      protected boolean abort(Void env) {
300        state.add(name + "-child-abort");
301        return true;
302      }
303    }
304  }
305
306  @Test
307  public void testAbortTimeout() {
308    final int PROC_TIMEOUT_MSEC = 2500;
309    List<String> state = new ArrayList<>();
310    Procedure<Void> proc = new TestWaitingProcedure("wproc", state, false);
311    proc.setTimeout(PROC_TIMEOUT_MSEC);
312    long startTime = EnvironmentEdgeManager.currentTime();
313    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
314    long execTime = EnvironmentEdgeManager.currentTime() - startTime;
315    LOG.info(Objects.toString(state));
316    assertTrue("we didn't wait enough execTime=" + execTime, execTime >= PROC_TIMEOUT_MSEC);
317    Procedure<?> result = procExecutor.getResult(rootId);
318    assertTrue(state.toString(), result.isFailed());
319    ProcedureTestingUtility.assertIsTimeoutException(result);
320    assertEquals(state.toString(), 2, state.size());
321    assertEquals("wproc-execute", state.get(0));
322    assertEquals("wproc-rollback", state.get(1));
323  }
324
325  @Test
326  public void testAbortTimeoutWithChildren() {
327    List<String> state = new ArrayList<>();
328    Procedure<Void> proc = new TestWaitingProcedure("wproc", state, true);
329    proc.setTimeout(2500);
330    long rootId = ProcedureTestingUtility.submitAndWait(procExecutor, proc);
331    LOG.info(Objects.toString(state));
332    Procedure<?> result = procExecutor.getResult(rootId);
333    assertTrue(state.toString(), result.isFailed());
334    ProcedureTestingUtility.assertIsTimeoutException(result);
335    assertEquals(state.toString(), 3, state.size());
336    assertEquals("wproc-execute", state.get(0));
337    assertEquals("wproc-child-execute", state.get(1));
338    assertEquals("wproc-rollback", state.get(2));
339  }
340}