001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
029import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
031import org.apache.hadoop.hbase.testclassification.MasterTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.apache.hadoop.hbase.util.Threads;
034import org.junit.jupiter.api.AfterEach;
035import org.junit.jupiter.api.BeforeEach;
036import org.junit.jupiter.api.Tag;
037import org.junit.jupiter.api.Test;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@Tag(MasterTests.TAG)
042@Tag(SmallTests.TAG)
043public class TestProcedureSuspended {
044
045  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureSuspended.class);
046
047  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
048  private static final Procedure NULL_PROC = null;
049
050  private ProcedureExecutor<TestProcEnv> procExecutor;
051  private ProcedureStore procStore;
052
053  private HBaseCommonTestingUtil htu;
054
055  @BeforeEach
056  public void setUp() throws IOException {
057    htu = new HBaseCommonTestingUtil();
058
059    procStore = new NoopProcedureStore();
060    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
061    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
062    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
063  }
064
065  @AfterEach
066  public void tearDown() throws IOException {
067    procExecutor.stop();
068    procStore.stop(false);
069  }
070
071  @Test
072  public void testSuspendWhileHoldingLocks() {
073    final AtomicBoolean lockA = new AtomicBoolean(false);
074    final AtomicBoolean lockB = new AtomicBoolean(false);
075
076    final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true);
077    final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true);
078    final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true);
079
080    procExecutor.submitProcedure(p1keyA);
081    procExecutor.submitProcedure(p2keyA);
082    procExecutor.submitProcedure(p3keyB);
083
084    // first run p1, p3 are able to run p2 is blocked by p1
085    waitAndAssertTimestamp(p1keyA, 1, 1);
086    waitAndAssertTimestamp(p2keyA, 0, -1);
087    waitAndAssertTimestamp(p3keyB, 1, 2);
088    assertTrue(lockA.get());
089    assertTrue(lockB.get());
090
091    // release p3
092    p3keyB.setThrowSuspend(false);
093    procExecutor.getScheduler().addFront(p3keyB);
094    waitAndAssertTimestamp(p1keyA, 1, 1);
095    waitAndAssertTimestamp(p2keyA, 0, -1);
096    waitAndAssertTimestamp(p3keyB, 2, 3);
097    assertTrue(lockA.get());
098
099    // wait until p3 is fully completed
100    ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB);
101    assertFalse(lockB.get());
102
103    // rollback p2 and wait until is fully completed
104    p1keyA.setTriggerRollback(true);
105    procExecutor.getScheduler().addFront(p1keyA);
106    ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA);
107
108    // p2 should start and suspend
109    waitAndAssertTimestamp(p1keyA, 4, 60000);
110    waitAndAssertTimestamp(p2keyA, 1, 7);
111    waitAndAssertTimestamp(p3keyB, 2, 3);
112    assertTrue(lockA.get());
113
114    // wait until p2 is fully completed
115    p2keyA.setThrowSuspend(false);
116    procExecutor.getScheduler().addFront(p2keyA);
117    ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA);
118    waitAndAssertTimestamp(p1keyA, 4, 60000);
119    waitAndAssertTimestamp(p2keyA, 2, 8);
120    waitAndAssertTimestamp(p3keyB, 2, 3);
121    assertFalse(lockA.get());
122    assertFalse(lockB.get());
123  }
124
125  @Test
126  public void testYieldWhileHoldingLocks() {
127    final AtomicBoolean lock = new AtomicBoolean(false);
128
129    final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false);
130    final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false);
131
132    procExecutor.submitProcedure(p1);
133    procExecutor.submitProcedure(p2);
134
135    // try to execute a bunch of yield on p1, p2 should be blocked
136    while (p1.getTimestamps().size() < 100) {
137      Threads.sleep(10);
138    }
139
140    assertEquals(0, p2.getTimestamps().size());
141
142    // wait until p1 is completed
143    p1.setThrowYield(false);
144    ProcedureTestingUtility.waitProcedure(procExecutor, p1);
145
146    // try to execute a bunch of yield on p2
147    while (p2.getTimestamps().size() < 100) {
148      Threads.sleep(10);
149    }
150
151    assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1,
152      p2.getTimestamps().get(0).longValue());
153
154    // wait until p2 is completed
155    p1.setThrowYield(false);
156    ProcedureTestingUtility.waitProcedure(procExecutor, p1);
157  }
158
159  private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) {
160    final ArrayList<Long> timestamps = proc.getTimestamps();
161    while (timestamps.size() < size) {
162      Threads.sleep(10);
163    }
164
165    LOG.info("{} -> {}", proc, timestamps);
166    assertEquals(size, timestamps.size());
167    if (size > 0) {
168      assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue());
169    }
170  }
171
172  public static class TestLockProcedure extends Procedure<TestProcEnv> {
173    private final ArrayList<Long> timestamps = new ArrayList<>();
174    private final String key;
175
176    private boolean triggerRollback = false;
177    private boolean throwSuspend = false;
178    private boolean throwYield = false;
179    private AtomicBoolean lock = null;
180    private boolean hasLock = false;
181
182    public TestLockProcedure(final AtomicBoolean lock, final String key, final boolean throwYield,
183      final boolean throwSuspend) {
184      this.lock = lock;
185      this.key = key;
186      this.throwYield = throwYield;
187      this.throwSuspend = throwSuspend;
188    }
189
190    public void setThrowYield(final boolean throwYield) {
191      this.throwYield = throwYield;
192    }
193
194    public void setThrowSuspend(final boolean throwSuspend) {
195      this.throwSuspend = throwSuspend;
196    }
197
198    public void setTriggerRollback(final boolean triggerRollback) {
199      this.triggerRollback = triggerRollback;
200    }
201
202    @Override
203    protected Procedure[] execute(final TestProcEnv env)
204      throws ProcedureYieldException, ProcedureSuspendedException {
205      LOG.info("EXECUTE {} suspend {}", this, lock != null);
206      timestamps.add(env.nextTimestamp());
207      if (triggerRollback) {
208        setFailure(getClass().getSimpleName(), new Exception("injected failure"));
209      } else if (throwYield) {
210        throw new ProcedureYieldException();
211      } else if (throwSuspend) {
212        throw new ProcedureSuspendedException();
213      }
214      return null;
215    }
216
217    @Override
218    protected void rollback(final TestProcEnv env) {
219      LOG.info("ROLLBACK {}", this);
220      timestamps.add(env.nextTimestamp() * 10000);
221    }
222
223    @Override
224    protected LockState acquireLock(final TestProcEnv env) {
225      hasLock = lock.compareAndSet(false, true);
226      if (hasLock) {
227        LOG.info("ACQUIRE LOCK {} {}", this, hasLock);
228        return LockState.LOCK_ACQUIRED;
229      }
230      return LockState.LOCK_YIELD_WAIT;
231    }
232
233    @Override
234    protected void releaseLock(final TestProcEnv env) {
235      LOG.info("RELEASE LOCK {} {}", this, hasLock);
236      lock.set(false);
237    }
238
239    @Override
240    protected boolean holdLock(final TestProcEnv env) {
241      return true;
242    }
243
244    public ArrayList<Long> getTimestamps() {
245      return timestamps;
246    }
247
248    @Override
249    protected void toStringClassDetails(StringBuilder builder) {
250      builder.append(getClass().getName());
251      builder.append("(" + key + ")");
252    }
253
254    @Override
255    protected boolean abort(TestProcEnv env) {
256      return false;
257    }
258
259    @Override
260    protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
261    }
262
263    @Override
264    protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
265    }
266  }
267
268  private static class TestProcEnv {
269    public final AtomicLong timestamp = new AtomicLong(0);
270
271    public long nextTimestamp() {
272      return timestamp.incrementAndGet();
273    }
274  }
275}