001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
028import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
030import org.apache.hadoop.hbase.testclassification.MasterTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.apache.hadoop.hbase.util.Threads;
033import org.junit.After;
034import org.junit.Before;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@Category({MasterTests.class, SmallTests.class})
042public class TestProcedureSuspended {
043  @ClassRule
044  public static final HBaseClassTestRule CLASS_RULE =
045      HBaseClassTestRule.forClass(TestProcedureSuspended.class);
046
047  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureSuspended.class);
048
049  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
050  private static final Procedure NULL_PROC = null;
051
052  private ProcedureExecutor<TestProcEnv> procExecutor;
053  private ProcedureStore procStore;
054
055  private HBaseCommonTestingUtility htu;
056
057  @Before
058  public void setUp() throws IOException {
059    htu = new HBaseCommonTestingUtility();
060
061    procStore = new NoopProcedureStore();
062    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
063    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
064    ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
065  }
066
067  @After
068  public void tearDown() throws IOException {
069    procExecutor.stop();
070    procStore.stop(false);
071  }
072
073  @Test
074  public void testSuspendWhileHoldingLocks() {
075    final AtomicBoolean lockA = new AtomicBoolean(false);
076    final AtomicBoolean lockB = new AtomicBoolean(false);
077
078    final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true);
079    final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true);
080    final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true);
081
082    procExecutor.submitProcedure(p1keyA);
083    procExecutor.submitProcedure(p2keyA);
084    procExecutor.submitProcedure(p3keyB);
085
086    // first run p1, p3 are able to run p2 is blocked by p1
087    waitAndAssertTimestamp(p1keyA, 1, 1);
088    waitAndAssertTimestamp(p2keyA, 0, -1);
089    waitAndAssertTimestamp(p3keyB, 1, 2);
090    assertEquals(true, lockA.get());
091    assertEquals(true, lockB.get());
092
093    // release p3
094    p3keyB.setThrowSuspend(false);
095    procExecutor.getScheduler().addFront(p3keyB);
096    waitAndAssertTimestamp(p1keyA, 1, 1);
097    waitAndAssertTimestamp(p2keyA, 0, -1);
098    waitAndAssertTimestamp(p3keyB, 2, 3);
099    assertEquals(true, lockA.get());
100
101    // wait until p3 is fully completed
102    ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB);
103    assertEquals(false, lockB.get());
104
105    // rollback p2 and wait until is fully completed
106    p1keyA.setTriggerRollback(true);
107    procExecutor.getScheduler().addFront(p1keyA);
108    ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA);
109
110    // p2 should start and suspend
111    waitAndAssertTimestamp(p1keyA, 4, 60000);
112    waitAndAssertTimestamp(p2keyA, 1, 7);
113    waitAndAssertTimestamp(p3keyB, 2, 3);
114    assertEquals(true, lockA.get());
115
116    // wait until p2 is fully completed
117    p2keyA.setThrowSuspend(false);
118    procExecutor.getScheduler().addFront(p2keyA);
119    ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA);
120    waitAndAssertTimestamp(p1keyA, 4, 60000);
121    waitAndAssertTimestamp(p2keyA, 2, 8);
122    waitAndAssertTimestamp(p3keyB, 2, 3);
123    assertEquals(false, lockA.get());
124    assertEquals(false, lockB.get());
125  }
126
127  @Test
128  public void testYieldWhileHoldingLocks() {
129    final AtomicBoolean lock = new AtomicBoolean(false);
130
131    final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false);
132    final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false);
133
134    procExecutor.submitProcedure(p1);
135    procExecutor.submitProcedure(p2);
136
137    // try to execute a bunch of yield on p1, p2 should be blocked
138    while (p1.getTimestamps().size() < 100) {
139      Threads.sleep(10);
140    }
141
142    assertEquals(0, p2.getTimestamps().size());
143
144    // wait until p1 is completed
145    p1.setThrowYield(false);
146    ProcedureTestingUtility.waitProcedure(procExecutor, p1);
147
148    // try to execute a bunch of yield on p2
149    while (p2.getTimestamps().size() < 100) {
150      Threads.sleep(10);
151    }
152
153    assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1,
154      p2.getTimestamps().get(0).longValue());
155
156    // wait until p2 is completed
157    p1.setThrowYield(false);
158    ProcedureTestingUtility.waitProcedure(procExecutor, p1);
159  }
160
161  private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) {
162    final ArrayList<Long> timestamps = proc.getTimestamps();
163    while (timestamps.size() < size) {
164      Threads.sleep(10);
165    }
166
167    LOG.info(proc + " -> " + timestamps);
168    assertEquals(size, timestamps.size());
169    if (size > 0) {
170      assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue());
171    }
172  }
173
174  public static class TestLockProcedure extends Procedure<TestProcEnv> {
175    private final ArrayList<Long> timestamps = new ArrayList<>();
176    private final String key;
177
178    private boolean triggerRollback = false;
179    private boolean throwSuspend = false;
180    private boolean throwYield = false;
181    private AtomicBoolean lock = null;
182    private boolean hasLock = false;
183
184    public TestLockProcedure(final AtomicBoolean lock, final String key,
185        final boolean throwYield, final boolean throwSuspend) {
186      this.lock = lock;
187      this.key = key;
188      this.throwYield = throwYield;
189      this.throwSuspend = throwSuspend;
190    }
191
192    public void setThrowYield(final boolean throwYield) {
193      this.throwYield = throwYield;
194    }
195
196    public void setThrowSuspend(final boolean throwSuspend) {
197      this.throwSuspend = throwSuspend;
198    }
199
200    public void setTriggerRollback(final boolean triggerRollback) {
201      this.triggerRollback = triggerRollback;
202    }
203
204    @Override
205    protected Procedure[] execute(final TestProcEnv env)
206        throws ProcedureYieldException, ProcedureSuspendedException {
207      LOG.info("EXECUTE " + this + " suspend " + (lock != null));
208      timestamps.add(env.nextTimestamp());
209      if (triggerRollback) {
210        setFailure(getClass().getSimpleName(), new Exception("injected failure"));
211      } else if (throwYield) {
212        throw new ProcedureYieldException();
213      } else if (throwSuspend) {
214        throw new ProcedureSuspendedException();
215      }
216      return null;
217    }
218
219    @Override
220    protected void rollback(final TestProcEnv env) {
221      LOG.info("ROLLBACK " + this);
222      timestamps.add(env.nextTimestamp() * 10000);
223    }
224
225    @Override
226    protected LockState acquireLock(final TestProcEnv env) {
227      hasLock = lock.compareAndSet(false, true);
228      if (hasLock) {
229        LOG.info("ACQUIRE LOCK " + this + " " + (hasLock));
230        return LockState.LOCK_ACQUIRED;
231      }
232      return LockState.LOCK_YIELD_WAIT;
233    }
234
235    @Override
236    protected void releaseLock(final TestProcEnv env) {
237      LOG.info("RELEASE LOCK " + this + " " + hasLock);
238      lock.set(false);
239    }
240
241    @Override
242    protected boolean holdLock(final TestProcEnv env) {
243      return true;
244    }
245
246    public ArrayList<Long> getTimestamps() {
247      return timestamps;
248    }
249
250    @Override
251    protected void toStringClassDetails(StringBuilder builder) {
252      builder.append(getClass().getName());
253      builder.append("(" + key + ")");
254    }
255
256    @Override
257    protected boolean abort(TestProcEnv env) {
258      return false;
259    }
260
261    @Override
262    protected void serializeStateData(ProcedureStateSerializer serializer)
263        throws IOException {
264    }
265
266    @Override
267    protected void deserializeStateData(ProcedureStateSerializer serializer)
268        throws IOException {
269    }
270  }
271
272  private static class TestProcEnv {
273    public final AtomicLong timestamp = new AtomicLong(0);
274
275    public long nextTimestamp() {
276      return timestamp.incrementAndGet();
277    }
278  }
279}