001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 028import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 030import org.apache.hadoop.hbase.testclassification.MasterTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.apache.hadoop.hbase.util.Threads; 033import org.junit.After; 034import org.junit.Before; 035import org.junit.ClassRule; 036import org.junit.Test; 037import org.junit.experimental.categories.Category; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@Category({MasterTests.class, SmallTests.class}) 042public class TestProcedureSuspended { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestProcedureSuspended.class); 047 048 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureSuspended.class); 049 050 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 051 private static final Procedure NULL_PROC = null; 052 053 private ProcedureExecutor<TestProcEnv> procExecutor; 054 private ProcedureStore procStore; 055 056 private HBaseCommonTestingUtility htu; 057 058 @Before 059 public void setUp() throws IOException { 060 htu = new HBaseCommonTestingUtility(); 061 062 procStore = new NoopProcedureStore(); 063 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore); 064 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 065 ProcedureTestingUtility 066 .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true); 067 } 068 069 @After 070 public void tearDown() throws IOException { 071 procExecutor.stop(); 072 procStore.stop(false); 073 } 074 075 @Test 076 public void testSuspendWhileHoldingLocks() { 077 final AtomicBoolean lockA = new AtomicBoolean(false); 078 final AtomicBoolean lockB = new AtomicBoolean(false); 079 080 final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true); 081 final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true); 082 final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true); 083 084 procExecutor.submitProcedure(p1keyA); 085 procExecutor.submitProcedure(p2keyA); 086 procExecutor.submitProcedure(p3keyB); 087 088 // first run p1, p3 are able to run p2 is blocked by p1 089 waitAndAssertTimestamp(p1keyA, 1, 1); 090 waitAndAssertTimestamp(p2keyA, 0, -1); 091 waitAndAssertTimestamp(p3keyB, 1, 2); 092 assertEquals(true, lockA.get()); 093 assertEquals(true, lockB.get()); 094 095 // release p3 096 p3keyB.setThrowSuspend(false); 097 procExecutor.getScheduler().addFront(p3keyB); 098 waitAndAssertTimestamp(p1keyA, 1, 1); 099 waitAndAssertTimestamp(p2keyA, 0, -1); 100 waitAndAssertTimestamp(p3keyB, 2, 3); 101 assertEquals(true, lockA.get()); 102 103 // wait until p3 is fully completed 104 ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB); 105 assertEquals(false, lockB.get()); 106 107 // rollback p2 and wait until is fully completed 108 p1keyA.setTriggerRollback(true); 109 procExecutor.getScheduler().addFront(p1keyA); 110 ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA); 111 112 // p2 should start and suspend 113 waitAndAssertTimestamp(p1keyA, 4, 60000); 114 waitAndAssertTimestamp(p2keyA, 1, 7); 115 waitAndAssertTimestamp(p3keyB, 2, 3); 116 assertEquals(true, lockA.get()); 117 118 // wait until p2 is fully completed 119 p2keyA.setThrowSuspend(false); 120 procExecutor.getScheduler().addFront(p2keyA); 121 ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA); 122 waitAndAssertTimestamp(p1keyA, 4, 60000); 123 waitAndAssertTimestamp(p2keyA, 2, 8); 124 waitAndAssertTimestamp(p3keyB, 2, 3); 125 assertEquals(false, lockA.get()); 126 assertEquals(false, lockB.get()); 127 } 128 129 @Test 130 public void testYieldWhileHoldingLocks() { 131 final AtomicBoolean lock = new AtomicBoolean(false); 132 133 final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false); 134 final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false); 135 136 procExecutor.submitProcedure(p1); 137 procExecutor.submitProcedure(p2); 138 139 // try to execute a bunch of yield on p1, p2 should be blocked 140 while (p1.getTimestamps().size() < 100) Threads.sleep(10); 141 assertEquals(0, p2.getTimestamps().size()); 142 143 // wait until p1 is completed 144 p1.setThrowYield(false); 145 ProcedureTestingUtility.waitProcedure(procExecutor, p1); 146 147 // try to execute a bunch of yield on p2 148 while (p2.getTimestamps().size() < 100) Threads.sleep(10); 149 assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1, 150 p2.getTimestamps().get(0).longValue()); 151 152 // wait until p2 is completed 153 p1.setThrowYield(false); 154 ProcedureTestingUtility.waitProcedure(procExecutor, p1); 155 } 156 157 private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) { 158 final ArrayList<Long> timestamps = proc.getTimestamps(); 159 while (timestamps.size() < size) Threads.sleep(10); 160 LOG.info(proc + " -> " + timestamps); 161 assertEquals(size, timestamps.size()); 162 if (size > 0) { 163 assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue()); 164 } 165 } 166 167 public static class TestLockProcedure extends Procedure<TestProcEnv> { 168 private final ArrayList<Long> timestamps = new ArrayList<>(); 169 private final String key; 170 171 private boolean triggerRollback = false; 172 private boolean throwSuspend = false; 173 private boolean throwYield = false; 174 private AtomicBoolean lock = null; 175 private boolean hasLock = false; 176 177 public TestLockProcedure(final AtomicBoolean lock, final String key, 178 final boolean throwYield, final boolean throwSuspend) { 179 this.lock = lock; 180 this.key = key; 181 this.throwYield = throwYield; 182 this.throwSuspend = throwSuspend; 183 } 184 185 public void setThrowYield(final boolean throwYield) { 186 this.throwYield = throwYield; 187 } 188 189 public void setThrowSuspend(final boolean throwSuspend) { 190 this.throwSuspend = throwSuspend; 191 } 192 193 public void setTriggerRollback(final boolean triggerRollback) { 194 this.triggerRollback = triggerRollback; 195 } 196 197 @Override 198 protected Procedure[] execute(final TestProcEnv env) 199 throws ProcedureYieldException, ProcedureSuspendedException { 200 LOG.info("EXECUTE " + this + " suspend " + (lock != null)); 201 timestamps.add(env.nextTimestamp()); 202 if (triggerRollback) { 203 setFailure(getClass().getSimpleName(), new Exception("injected failure")); 204 } else if (throwYield) { 205 throw new ProcedureYieldException(); 206 } else if (throwSuspend) { 207 throw new ProcedureSuspendedException(); 208 } 209 return null; 210 } 211 212 @Override 213 protected void rollback(final TestProcEnv env) { 214 LOG.info("ROLLBACK " + this); 215 timestamps.add(env.nextTimestamp() * 10000); 216 } 217 218 @Override 219 protected LockState acquireLock(final TestProcEnv env) { 220 if ((hasLock = lock.compareAndSet(false, true))) { 221 LOG.info("ACQUIRE LOCK " + this + " " + (hasLock)); 222 return LockState.LOCK_ACQUIRED; 223 } 224 return LockState.LOCK_YIELD_WAIT; 225 } 226 227 @Override 228 protected void releaseLock(final TestProcEnv env) { 229 LOG.info("RELEASE LOCK " + this + " " + hasLock); 230 lock.set(false); 231 } 232 233 @Override 234 protected boolean holdLock(final TestProcEnv env) { 235 return true; 236 } 237 238 public ArrayList<Long> getTimestamps() { 239 return timestamps; 240 } 241 242 @Override 243 protected void toStringClassDetails(StringBuilder builder) { 244 builder.append(getClass().getName()); 245 builder.append("(" + key + ")"); 246 } 247 248 @Override 249 protected boolean abort(TestProcEnv env) { return false; } 250 251 @Override 252 protected void serializeStateData(ProcedureStateSerializer serializer) 253 throws IOException { 254 } 255 256 @Override 257 protected void deserializeStateData(ProcedureStateSerializer serializer) 258 throws IOException { 259 } 260 } 261 262 private static class TestProcEnv { 263 public final AtomicLong timestamp = new AtomicLong(0); 264 265 public long nextTimestamp() { 266 return timestamp.incrementAndGet(); 267 } 268 } 269}