001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
027import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
029import org.apache.hadoop.hbase.testclassification.MasterTests;
030import org.apache.hadoop.hbase.testclassification.MediumTests;
031import org.junit.jupiter.api.AfterEach;
032import org.junit.jupiter.api.BeforeEach;
033import org.junit.jupiter.api.Tag;
034import org.junit.jupiter.api.Test;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
041
042@Tag(MasterTests.TAG)
043@Tag(MediumTests.TAG)
044public class TestProcedureEvents {
045
046  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class);
047
048  private TestProcEnv procEnv;
049  private ProcedureStore procStore;
050  private ProcedureExecutor<TestProcEnv> procExecutor;
051
052  private HBaseCommonTestingUtil htu;
053  private FileSystem fs;
054  private Path logDir;
055
056  @BeforeEach
057  public void setUp() throws IOException {
058    htu = new HBaseCommonTestingUtil();
059    Path testDir = htu.getDataTestDir();
060    fs = testDir.getFileSystem(htu.getConfiguration());
061    logDir = new Path(testDir, "proc-logs");
062
063    procEnv = new TestProcEnv();
064    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
065    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore);
066    procStore.start(1);
067    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true);
068  }
069
070  @AfterEach
071  public void tearDown() throws IOException {
072    procExecutor.stop();
073    procStore.stop(false);
074    procExecutor.join();
075    fs.delete(logDir, true);
076  }
077
078  /**
079   * Tests being able to suspend a Procedure for N timeouts and then failing.s Resets the timeout
080   * after each elapses. See {@link TestTimeoutEventProcedure} for example of how to do this sort of
081   * trickery with the ProcedureExecutor; i.e. suspend for a while, check for a condition and if not
082   * set, suspend again, etc., ultimately failing or succeeding eventually.
083   */
084  @Test
085  public void testTimeoutEventProcedure() throws Exception {
086    final int NTIMEOUTS = 5;
087
088    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(500, NTIMEOUTS);
089    procExecutor.submitProcedure(proc);
090
091    ProcedureTestingUtility.waitProcedure(procExecutor, proc.getProcId());
092    ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
093    assertEquals(NTIMEOUTS + 1, proc.getTimeoutsCount());
094  }
095
096  @Test
097  public void testTimeoutEventProcedureDoubleExecution() throws Exception {
098    testTimeoutEventProcedureDoubleExecution(false);
099  }
100
101  @Test
102  public void testTimeoutEventProcedureDoubleExecutionKillIfSuspended() throws Exception {
103    testTimeoutEventProcedureDoubleExecution(true);
104  }
105
106  private void testTimeoutEventProcedureDoubleExecution(final boolean killIfSuspended)
107    throws Exception {
108    TestTimeoutEventProcedure proc = new TestTimeoutEventProcedure(1000, 3);
109    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExecutor, true);
110    ProcedureTestingUtility.setKillIfSuspended(procExecutor, killIfSuspended);
111    long procId = procExecutor.submitProcedure(proc);
112    ProcedureTestingUtility.testRecoveryAndDoubleExecution(procExecutor, procId, true);
113    ProcedureTestingUtility.assertIsAbortException(procExecutor.getResult(proc.getProcId()));
114  }
115
116  /**
117   * This Event+Procedure exhibits following behavior:
118   * <ul>
119   * <li>On procedure execute()
120   * <ul>
121   * <li>If had enough timeouts, abort the procedure. Else....</li>
122   * <li>Suspend the event and add self to its suspend queue</li>
123   * <li>Go into waiting state</li>
124   * </ul>
125   * </li>
126   * <li>On waiting timeout
127   * <ul>
128   * <li>Wake the event (which adds this procedure back into scheduler queue), and set own's state
129   * to RUNNABLE (so can be executed again).</li>
130   * </ul>
131   * </li>
132   * </ul>
133   */
134  public static class TestTimeoutEventProcedure extends NoopProcedure<TestProcEnv> {
135    private final ProcedureEvent event = new ProcedureEvent("timeout-event");
136
137    private final AtomicInteger ntimeouts = new AtomicInteger(0);
138    private int maxTimeouts = 1;
139
140    public TestTimeoutEventProcedure() {
141    }
142
143    public TestTimeoutEventProcedure(final int timeoutMsec, final int maxTimeouts) {
144      this.maxTimeouts = maxTimeouts;
145      setTimeout(timeoutMsec);
146    }
147
148    public int getTimeoutsCount() {
149      return ntimeouts.get();
150    }
151
152    @Override
153    protected Procedure[] execute(final TestProcEnv env) throws ProcedureSuspendedException {
154      LOG.info("EXECUTE " + this + " ntimeouts=" + ntimeouts);
155      if (ntimeouts.get() > maxTimeouts) {
156        setAbortFailure("test", "give up after " + ntimeouts.get());
157        return null;
158      }
159
160      event.suspend();
161      if (event.suspendIfNotReady(this)) {
162        setState(ProcedureState.WAITING_TIMEOUT);
163        throw new ProcedureSuspendedException();
164      }
165
166      return null;
167    }
168
169    @Override
170    protected synchronized boolean setTimeoutFailure(final TestProcEnv env) {
171      int n = ntimeouts.incrementAndGet();
172      LOG.info("HANDLE TIMEOUT {} ntimeouts={}", this, n);
173      setState(ProcedureState.RUNNABLE);
174      event.wake((AbstractProcedureScheduler) env.getProcedureScheduler());
175      return false;
176    }
177
178    @Override
179    protected void afterReplay(final TestProcEnv env) {
180      if (getState() == ProcedureState.WAITING_TIMEOUT) {
181        event.suspend();
182        event.suspendIfNotReady(this);
183      }
184    }
185
186    @Override
187    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
188      Int32Value.Builder ntimeoutsBuilder = Int32Value.newBuilder().setValue(ntimeouts.get());
189      serializer.serialize(ntimeoutsBuilder.build());
190
191      Int32Value.Builder maxTimeoutsBuilder = Int32Value.newBuilder().setValue(maxTimeouts);
192      serializer.serialize(maxTimeoutsBuilder.build());
193    }
194
195    @Override
196    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
197      Int32Value ntimeoutsValue = serializer.deserialize(Int32Value.class);
198      ntimeouts.set(ntimeoutsValue.getValue());
199
200      Int32Value maxTimeoutsValue = serializer.deserialize(Int32Value.class);
201      maxTimeouts = maxTimeoutsValue.getValue();
202    }
203  }
204
205  private class TestProcEnv {
206    public ProcedureScheduler getProcedureScheduler() {
207      return procExecutor.getScheduler();
208    }
209  }
210}