001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
028import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
030import org.apache.hadoop.hbase.testclassification.MasterTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.apache.hadoop.hbase.util.Threads;
033import org.junit.After;
034import org.junit.Before;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@Category({MasterTests.class, SmallTests.class})
042public class TestProcedureSuspended {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046      HBaseClassTestRule.forClass(TestProcedureSuspended.class);
047
048  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureSuspended.class);
049
050  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
051  private static final Procedure NULL_PROC = null;
052
053  private ProcedureExecutor<TestProcEnv> procExecutor;
054  private ProcedureStore procStore;
055
056  private HBaseCommonTestingUtility htu;
057
058  @Before
059  public void setUp() throws IOException {
060    htu = new HBaseCommonTestingUtility();
061
062    procStore = new NoopProcedureStore();
063    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore);
064    procStore.start(PROCEDURE_EXECUTOR_SLOTS);
065    ProcedureTestingUtility
066        .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true);
067  }
068
069  @After
070  public void tearDown() throws IOException {
071    procExecutor.stop();
072    procStore.stop(false);
073  }
074
075  @Test
076  public void testSuspendWhileHoldingLocks() {
077    final AtomicBoolean lockA = new AtomicBoolean(false);
078    final AtomicBoolean lockB = new AtomicBoolean(false);
079
080    final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true);
081    final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true);
082    final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true);
083
084    procExecutor.submitProcedure(p1keyA);
085    procExecutor.submitProcedure(p2keyA);
086    procExecutor.submitProcedure(p3keyB);
087
088    // first run p1, p3 are able to run p2 is blocked by p1
089    waitAndAssertTimestamp(p1keyA, 1, 1);
090    waitAndAssertTimestamp(p2keyA, 0, -1);
091    waitAndAssertTimestamp(p3keyB, 1, 2);
092    assertEquals(true, lockA.get());
093    assertEquals(true, lockB.get());
094
095    // release p3
096    p3keyB.setThrowSuspend(false);
097    procExecutor.getScheduler().addFront(p3keyB);
098    waitAndAssertTimestamp(p1keyA, 1, 1);
099    waitAndAssertTimestamp(p2keyA, 0, -1);
100    waitAndAssertTimestamp(p3keyB, 2, 3);
101    assertEquals(true, lockA.get());
102
103    // wait until p3 is fully completed
104    ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB);
105    assertEquals(false, lockB.get());
106
107    // rollback p2 and wait until is fully completed
108    p1keyA.setTriggerRollback(true);
109    procExecutor.getScheduler().addFront(p1keyA);
110    ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA);
111
112    // p2 should start and suspend
113    waitAndAssertTimestamp(p1keyA, 4, 60000);
114    waitAndAssertTimestamp(p2keyA, 1, 7);
115    waitAndAssertTimestamp(p3keyB, 2, 3);
116    assertEquals(true, lockA.get());
117
118    // wait until p2 is fully completed
119    p2keyA.setThrowSuspend(false);
120    procExecutor.getScheduler().addFront(p2keyA);
121    ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA);
122    waitAndAssertTimestamp(p1keyA, 4, 60000);
123    waitAndAssertTimestamp(p2keyA, 2, 8);
124    waitAndAssertTimestamp(p3keyB, 2, 3);
125    assertEquals(false, lockA.get());
126    assertEquals(false, lockB.get());
127  }
128
129  @Test
130  public void testYieldWhileHoldingLocks() {
131    final AtomicBoolean lock = new AtomicBoolean(false);
132
133    final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false);
134    final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false);
135
136    procExecutor.submitProcedure(p1);
137    procExecutor.submitProcedure(p2);
138
139    // try to execute a bunch of yield on p1, p2 should be blocked
140    while (p1.getTimestamps().size() < 100) Threads.sleep(10);
141    assertEquals(0, p2.getTimestamps().size());
142
143    // wait until p1 is completed
144    p1.setThrowYield(false);
145    ProcedureTestingUtility.waitProcedure(procExecutor, p1);
146
147    // try to execute a bunch of yield on p2
148    while (p2.getTimestamps().size() < 100) Threads.sleep(10);
149    assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1,
150      p2.getTimestamps().get(0).longValue());
151
152    // wait until p2 is completed
153    p1.setThrowYield(false);
154    ProcedureTestingUtility.waitProcedure(procExecutor, p1);
155  }
156
157  private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) {
158    final ArrayList<Long> timestamps = proc.getTimestamps();
159    while (timestamps.size() < size) Threads.sleep(10);
160    LOG.info(proc + " -> " + timestamps);
161    assertEquals(size, timestamps.size());
162    if (size > 0) {
163      assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue());
164    }
165  }
166
167  public static class TestLockProcedure extends Procedure<TestProcEnv> {
168    private final ArrayList<Long> timestamps = new ArrayList<>();
169    private final String key;
170
171    private boolean triggerRollback = false;
172    private boolean throwSuspend = false;
173    private boolean throwYield = false;
174    private AtomicBoolean lock = null;
175    private boolean hasLock = false;
176
177    public TestLockProcedure(final AtomicBoolean lock, final String key,
178        final boolean throwYield, final boolean throwSuspend) {
179      this.lock = lock;
180      this.key = key;
181      this.throwYield = throwYield;
182      this.throwSuspend = throwSuspend;
183    }
184
185    public void setThrowYield(final boolean throwYield) {
186      this.throwYield = throwYield;
187    }
188
189    public void setThrowSuspend(final boolean throwSuspend) {
190      this.throwSuspend = throwSuspend;
191    }
192
193    public void setTriggerRollback(final boolean triggerRollback) {
194      this.triggerRollback = triggerRollback;
195    }
196
197    @Override
198    protected Procedure[] execute(final TestProcEnv env)
199        throws ProcedureYieldException, ProcedureSuspendedException {
200      LOG.info("EXECUTE " + this + " suspend " + (lock != null));
201      timestamps.add(env.nextTimestamp());
202      if (triggerRollback) {
203        setFailure(getClass().getSimpleName(), new Exception("injected failure"));
204      } else if (throwYield) {
205        throw new ProcedureYieldException();
206      } else if (throwSuspend) {
207        throw new ProcedureSuspendedException();
208      }
209      return null;
210    }
211
212    @Override
213    protected void rollback(final TestProcEnv env) {
214      LOG.info("ROLLBACK " + this);
215      timestamps.add(env.nextTimestamp() * 10000);
216    }
217
218    @Override
219    protected LockState acquireLock(final TestProcEnv env) {
220      if ((hasLock = lock.compareAndSet(false, true))) {
221        LOG.info("ACQUIRE LOCK " + this + " " + (hasLock));
222        return LockState.LOCK_ACQUIRED;
223      }
224      return LockState.LOCK_YIELD_WAIT;
225    }
226
227    @Override
228    protected void releaseLock(final TestProcEnv env) {
229      LOG.info("RELEASE LOCK " + this + " " + hasLock);
230      lock.set(false);
231    }
232
233    @Override
234    protected boolean holdLock(final TestProcEnv env) {
235      return true;
236    }
237
238    public ArrayList<Long> getTimestamps() {
239      return timestamps;
240    }
241
242    @Override
243    protected void toStringClassDetails(StringBuilder builder) {
244      builder.append(getClass().getName());
245      builder.append("(" + key + ")");
246    }
247
248    @Override
249    protected boolean abort(TestProcEnv env) { return false; }
250
251    @Override
252    protected void serializeStateData(ProcedureStateSerializer serializer)
253        throws IOException {
254    }
255
256    @Override
257    protected void deserializeStateData(ProcedureStateSerializer serializer)
258        throws IOException {
259    }
260  }
261
262  private static class TestProcEnv {
263    public final AtomicLong timestamp = new AtomicLong(0);
264
265    public long nextTimestamp() {
266      return timestamp.incrementAndGet();
267    }
268  }
269}