001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
028import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
030import org.apache.hadoop.hbase.testclassification.MasterTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.junit.After;
033import org.junit.Before;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
041
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
043
044@Category({MasterTests.class, SmallTests.class})
045public class TestProcedureEvents {
046
047  @ClassRule
048  public static final HBaseClassTestRule CLASS_RULE =
049      HBaseClassTestRule.forClass(TestProcedureEvents.class);
050
051  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class);
052
053  private TestProcEnv procEnv;
054  private ProcedureStore procStore;
055  private ProcedureExecutor<TestProcEnv> procExecutor;
056
057  private HBaseCommonTestingUtility htu;
058  private FileSystem fs;
059  private Path logDir;
060
061  @Before
062  public void setUp() throws IOException {
063    htu = new HBaseCommonTestingUtility();
064    Path testDir = htu.getDataTestDir();
065    fs = testDir.getFileSystem(htu.getConfiguration());
066    logDir = new Path(testDir, "proc-logs");
067
068    procEnv = new TestProcEnv();
069    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
070    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
071    procStore.start(1);
072    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
073  }
074
075  @After
076  public void tearDown() throws IOException {
077    procExecutor.stop();
078    procStore.stop(false);
079    procExecutor.join();
080    fs.delete(logDir, true);
081  }
082
083  /**
084   * Tests being able to suspend a Procedure for N timeouts and then failing.s
085   * Resets the timeout after each elapses. See {@link TestTimeoutEventProcedure} for example
086   * of how to do this sort of trickery with the ProcedureExecutor; i.e. suspend for a while,
087   * check for a condition and if not set, suspend again, etc., ultimately failing or succeeding
088   * eventually.
089   */
090  @Test
091  public void testTimeoutEventProcedure() throws Exception {
092    final int NTIMEOUTS = 5;
093
094    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS);
095    procExecutor.submitProcedure(proc);
096
097    ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId());
098    ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
099    assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount());
100  }
101
102  @Test
103  public void testTimeoutEventProcedureDoubleExecution() throws Exception {
104    testTimeoutEventProcedureDoubleExecution(false);
105  }
106
107  @Test
108  public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception {
109    testTimeoutEventProcedureDoubleExecution(true);
110  }
111
112  private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended)
113      throws Exception {
114    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3);
115    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
116    ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended);
117    long procId = procExecutor.submitProcedure(proc);
118    ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true);
119    ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
120  }
121
122  /**
123   * This Event+Procedure exhibits following behavior:
124   * <ul>
125   *   <li>On procedure execute()
126   *     <ul>
127   *       <li>If had enough timeouts, abort the procedure. Else....</li>
128   *       <li>Suspend the event and add self to its suspend queue</li>
129   *       <li>Go into waiting state</li>
130   *     </ul>
131   *   </li>
132   *   <li>
133   *     On waiting timeout
134   *     <ul>
135   *       <li>Wake the event (which adds this procedure back into scheduler queue), and set own's
136   *       state to RUNNABLE (so can be executed again).</li>
137   *     </ul>
138   *   </li>
139   * </ul>
140   */
141  public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
142    private final ProcedureEvent event = new ProcedureEvent("timeout-event");
143
144    private final AtomicInteger ntimeouts = new AtomicInteger(0);
145    private int maxTimeouts = 1;
146
147    public TestTimeoutEventProcedure() {}
148
149    public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) {
150      this.maxTimeouts = maxTimeouts;
151      setTimeout(timeoutMsec);
152    }
153
154    public int getTimeoutsCount() {
155      return ntimeouts.get();
156    }
157
158    @Override
159    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
160      LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
161      if (ntimeouts.get() > maxTimeouts) {
162        setAbortFailure("test", "give up after " + ntimeouts.get());
163        return null;
164      }
165
166      event.suspend();
167      if (event.suspendIfNotReady(this)) {
168        setState(ProcedureState.WAITING_TIMEOUT);
169        throw new ProcedureSuspendedException();
170      }
171
172      return null;
173    }
174
175    @Override
176    protected synchronized boolean setTimeoutFailure(final TestProcEnv env) {
177      int n = ntimeouts.incrementAndGet();
178      LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
179      setState(ProcedureState.RUNNABLE);
180      event.wake((AbstractProcedureScheduler) env.getProcedureScheduler());
181      return false;
182    }
183
184    @Override
185    protected void afterReplay(final TestProcEnv env) {
186      if (getState() == ProcedureState.WAITING_TIMEOUT) {
187        event.suspend();
188        event.suspendIfNotReady(this);
189      }
190    }
191
192    @Override
193    protected void serializeStateData(ProcedureStateSerializer serializer)
194        throws IOException {
195      Int32Value.Builder ntimeoutsBuilder = Int32Value.newBuilder().setValue(ntimeouts.get());
196      serializer.serialize(ntimeoutsBuilder.build());
197
198      Int32Value.Builder maxTimeoutsBuilder = Int32Value.newBuilder().setValue(maxTimeouts);
199      serializer.serialize(maxTimeoutsBuilder.build());
200    }
201
202    @Override
203    protected void deserializeStateData(ProcedureStateSerializer serializer)
204        throws IOException {
205      Int32Value ntimeoutsValue = serializer.deserialize(Int32Value.class);
206      ntimeouts.set(ntimeoutsValue.getValue());
207
208      Int32Value maxTimeoutsValue = serializer.deserialize(Int32Value.class);
209      maxTimeouts = maxTimeoutsValue.getValue();
210    }
211  }
212
213  private class TestProcEnv {
214    public ProcedureScheduler getProcedureScheduler() {
215      return procExecutor.getScheduler();
216    }
217  }
218}