001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
028import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
030import org.apache.hadoop.hbase.testclassification.MasterTests;
031import org.apache.hadoop.hbase.testclassification.MediumTests;
032import org.junit.After;
033import org.junit.Before;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
041
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
043
044@Category({ MasterTests.class, MediumTests.class })
045public class TestProcedureEvents {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049    HBaseClassTestRule.forClass(TestProcedureEvents.class);
050
051  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class);
052
053  private TestProcEnv procEnv;
054  private ProcedureStore procStore;
055  private ProcedureExecutor<TestProcEnv> procExecutor;
056
057  private HBaseCommonTestingUtil htu;
058  private FileSystem fs;
059  private Path logDir;
060
061  @Before
062  public void setUp() throws IOException {
063    htu = new HBaseCommonTestingUtil();
064    Path testDir = htu.getDataTestDir();
065    fs = testDir.getFileSystem(htu.getConfiguration());
066    logDir = new Path(testDir, "proc-logs");
067
068    procEnv = new TestProcEnv();
069    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
070    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
071    procStore.start(1);
072    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
073  }
074
075  @After
076  public void tearDown() throws IOException {
077    procExecutor.stop();
078    procStore.stop(false);
079    procExecutor.join();
080    fs.delete(logDir, true);
081  }
082
083  /**
084   * Tests being able to suspend a Procedure for N timeouts and then failing.s Resets the timeout
085   * after each elapses. See {@link TestTimeoutEventProcedure} for example of how to do this sort of
086   * trickery with the ProcedureExecutor; i.e. suspend for a while, check for a condition and if not
087   * set, suspend again, etc., ultimately failing or succeeding eventually.
088   */
089  @Test
090  public void testTimeoutEventProcedure() throws Exception {
091    final int NTIMEOUTS = 5;
092
093    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS);
094    procExecutor.submitProcedure(proc);
095
096    ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId());
097    ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
098    assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount());
099  }
100
101  @Test
102  public void testTimeoutEventProcedureDoubleExecution() throws Exception {
103    testTimeoutEventProcedureDoubleExecution(false);
104  }
105
106  @Test
107  public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception {
108    testTimeoutEventProcedureDoubleExecution(true);
109  }
110
111  private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended)
112    throws Exception {
113    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3);
114    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
115    ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended);
116    long procId = procExecutor.submitProcedure(proc);
117    ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true);
118    ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
119  }
120
121  /**
122   * This Event+Procedure exhibits following behavior:
123   * <ul>
124   * <li>On procedure execute()
125   * <ul>
126   * <li>If had enough timeouts, abort the procedure. Else....</li>
127   * <li>Suspend the event and add self to its suspend queue</li>
128   * <li>Go into waiting state</li>
129   * </ul>
130   * </li>
131   * <li>On waiting timeout
132   * <ul>
133   * <li>Wake the event (which adds this procedure back into scheduler queue), and set own's state
134   * to RUNNABLE (so can be executed again).</li>
135   * </ul>
136   * </li>
137   * </ul>
138   */
139  public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
140    private final ProcedureEvent event = new ProcedureEvent("timeout-event");
141
142    private final AtomicInteger ntimeouts = new AtomicInteger(0);
143    private int maxTimeouts = 1;
144
145    public TestTimeoutEventProcedure() {
146    }
147
148    public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) {
149      this.maxTimeouts = maxTimeouts;
150      setTimeout(timeoutMsec);
151    }
152
153    public int getTimeoutsCount() {
154      return ntimeouts.get();
155    }
156
157    @Override
158    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
159      LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
160      if (ntimeouts.get() > maxTimeouts) {
161        setAbortFailure("test", "give up after " + ntimeouts.get());
162        return null;
163      }
164
165      event.suspend();
166      if (event.suspendIfNotReady(this)) {
167        setState(ProcedureState.WAITING_TIMEOUT);
168        throw new ProcedureSuspendedException();
169      }
170
171      return null;
172    }
173
174    @Override
175    protected synchronized boolean setTimeoutFailure(final TestProcEnv env) {
176      int n = ntimeouts.incrementAndGet();
177      LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
178      setState(ProcedureState.RUNNABLE);
179      event.wake((AbstractProcedureScheduler) env.getProcedureScheduler());
180      return false;
181    }
182
183    @Override
184    protected void afterReplay(final TestProcEnv env) {
185      if (getState() == ProcedureState.WAITING_TIMEOUT) {
186        event.suspend();
187        event.suspendIfNotReady(this);
188      }
189    }
190
191    @Override
192    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
193      Int32Value.Builder ntimeoutsBuilder = Int32Value.newBuilder().setValue(ntimeouts.get());
194      serializer.serialize(ntimeoutsBuilder.build());
195
196      Int32Value.Builder maxTimeoutsBuilder = Int32Value.newBuilder().setValue(maxTimeouts);
197      serializer.serialize(maxTimeoutsBuilder.build());
198    }
199
200    @Override
201    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
202      Int32Value ntimeoutsValue = serializer.deserialize(Int32Value.class);
203      ntimeouts.set(ntimeoutsValue.getValue());
204
205      Int32Value maxTimeoutsValue = serializer.deserialize(Int32Value.class);
206      maxTimeouts = maxTimeoutsValue.getValue();
207    }
208  }
209
210  private class TestProcEnv {
211    public ProcedureScheduler getProcedureScheduler() {
212      return procExecutor.getScheduler();
213    }
214  }
215}