001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 028import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 029import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 030import org.apache.hadoop.hbase.testclassification.MasterTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.apache.hadoop.hbase.util.Threads; 033import org.junit.After; 034import org.junit.Before; 035import org.junit.ClassRule; 036import org.junit.Test; 037import org.junit.experimental.categories.Category; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@Category({ MasterTests.class, SmallTests.class }) 042public class TestProcedureSuspended { 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestProcedureSuspended.class); 046 047 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureSuspended.class); 048 049 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 050 private static final Procedure NULL_PROC = null; 051 052 private ProcedureExecutor<TestProcEnv> procExecutor; 053 private ProcedureStore procStore; 054 055 private HBaseCommonTestingUtility htu; 056 057 @Before 058 public void setUp() throws IOException { 059 htu = new HBaseCommonTestingUtility(); 060 061 procStore = new NoopProcedureStore(); 062 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore); 063 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 064 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 065 } 066 067 @After 068 public void tearDown() throws IOException { 069 procExecutor.stop(); 070 procStore.stop(false); 071 } 072 073 @Test 074 public void testSuspendWhileHoldingLocks() { 075 final AtomicBoolean lockA = new AtomicBoolean(false); 076 final AtomicBoolean lockB = new AtomicBoolean(false); 077 078 final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true); 079 final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true); 080 final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true); 081 082 procExecutor.submitProcedure(p1keyA); 083 procExecutor.submitProcedure(p2keyA); 084 procExecutor.submitProcedure(p3keyB); 085 086 // first run p1, p3 are able to run p2 is blocked by p1 087 waitAndAssertTimestamp(p1keyA, 1, 1); 088 waitAndAssertTimestamp(p2keyA, 0, -1); 089 waitAndAssertTimestamp(p3keyB, 1, 2); 090 assertEquals(true, lockA.get()); 091 assertEquals(true, lockB.get()); 092 093 // release p3 094 p3keyB.setThrowSuspend(false); 095 procExecutor.getScheduler().addFront(p3keyB); 096 waitAndAssertTimestamp(p1keyA, 1, 1); 097 waitAndAssertTimestamp(p2keyA, 0, -1); 098 waitAndAssertTimestamp(p3keyB, 2, 3); 099 assertEquals(true, lockA.get()); 100 101 // wait until p3 is fully completed 102 ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB); 103 assertEquals(false, lockB.get()); 104 105 // rollback p2 and wait until is fully completed 106 p1keyA.setTriggerRollback(true); 107 procExecutor.getScheduler().addFront(p1keyA); 108 ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA); 109 110 // p2 should start and suspend 111 waitAndAssertTimestamp(p1keyA, 4, 60000); 112 waitAndAssertTimestamp(p2keyA, 1, 7); 113 waitAndAssertTimestamp(p3keyB, 2, 3); 114 assertEquals(true, lockA.get()); 115 116 // wait until p2 is fully completed 117 p2keyA.setThrowSuspend(false); 118 procExecutor.getScheduler().addFront(p2keyA); 119 ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA); 120 waitAndAssertTimestamp(p1keyA, 4, 60000); 121 waitAndAssertTimestamp(p2keyA, 2, 8); 122 waitAndAssertTimestamp(p3keyB, 2, 3); 123 assertEquals(false, lockA.get()); 124 assertEquals(false, lockB.get()); 125 } 126 127 @Test 128 public void testYieldWhileHoldingLocks() { 129 final AtomicBoolean lock = new AtomicBoolean(false); 130 131 final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false); 132 final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false); 133 134 procExecutor.submitProcedure(p1); 135 procExecutor.submitProcedure(p2); 136 137 // try to execute a bunch of yield on p1, p2 should be blocked 138 while (p1.getTimestamps().size() < 100) { 139 Threads.sleep(10); 140 } 141 142 assertEquals(0, p2.getTimestamps().size()); 143 144 // wait until p1 is completed 145 p1.setThrowYield(false); 146 ProcedureTestingUtility.waitProcedure(procExecutor, p1); 147 148 // try to execute a bunch of yield on p2 149 while (p2.getTimestamps().size() < 100) { 150 Threads.sleep(10); 151 } 152 153 assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1, 154 p2.getTimestamps().get(0).longValue()); 155 156 // wait until p2 is completed 157 p1.setThrowYield(false); 158 ProcedureTestingUtility.waitProcedure(procExecutor, p1); 159 } 160 161 private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) { 162 final ArrayList<Long> timestamps = proc.getTimestamps(); 163 while (timestamps.size() < size) { 164 Threads.sleep(10); 165 } 166 167 LOG.info(proc + " -> " + timestamps); 168 assertEquals(size, timestamps.size()); 169 if (size > 0) { 170 assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue()); 171 } 172 } 173 174 public static class TestLockProcedure extends Procedure<TestProcEnv> { 175 private final ArrayList<Long> timestamps = new ArrayList<>(); 176 private final String key; 177 178 private boolean triggerRollback = false; 179 private boolean throwSuspend = false; 180 private boolean throwYield = false; 181 private AtomicBoolean lock = null; 182 private boolean hasLock = false; 183 184 public TestLockProcedure(final AtomicBoolean lock, final String key, final boolean throwYield, 185 final boolean throwSuspend) { 186 this.lock = lock; 187 this.key = key; 188 this.throwYield = throwYield; 189 this.throwSuspend = throwSuspend; 190 } 191 192 public void setThrowYield(final boolean throwYield) { 193 this.throwYield = throwYield; 194 } 195 196 public void setThrowSuspend(final boolean throwSuspend) { 197 this.throwSuspend = throwSuspend; 198 } 199 200 public void setTriggerRollback(final boolean triggerRollback) { 201 this.triggerRollback = triggerRollback; 202 } 203 204 @Override 205 protected Procedure[] execute(final TestProcEnv env) 206 throws ProcedureYieldException, ProcedureSuspendedException { 207 LOG.info("EXECUTE " + this + " suspend " + (lock != null)); 208 timestamps.add(env.nextTimestamp()); 209 if (triggerRollback) { 210 setFailure(getClass().getSimpleName(), new Exception("injected failure")); 211 } else if (throwYield) { 212 throw new ProcedureYieldException(); 213 } else if (throwSuspend) { 214 throw new ProcedureSuspendedException(); 215 } 216 return null; 217 } 218 219 @Override 220 protected void rollback(final TestProcEnv env) { 221 LOG.info("ROLLBACK " + this); 222 timestamps.add(env.nextTimestamp() * 10000); 223 } 224 225 @Override 226 protected LockState acquireLock(final TestProcEnv env) { 227 hasLock = lock.compareAndSet(false, true); 228 if (hasLock) { 229 LOG.info("ACQUIRE LOCK " + this + " " + (hasLock)); 230 return LockState.LOCK_ACQUIRED; 231 } 232 return LockState.LOCK_YIELD_WAIT; 233 } 234 235 @Override 236 protected void releaseLock(final TestProcEnv env) { 237 LOG.info("RELEASE LOCK " + this + " " + hasLock); 238 lock.set(false); 239 } 240 241 @Override 242 protected boolean holdLock(final TestProcEnv env) { 243 return true; 244 } 245 246 public ArrayList<Long> getTimestamps() { 247 return timestamps; 248 } 249 250 @Override 251 protected void toStringClassDetails(StringBuilder builder) { 252 builder.append(getClass().getName()); 253 builder.append("(" + key + ")"); 254 } 255 256 @Override 257 protected boolean abort(TestProcEnv env) { 258 return false; 259 } 260 261 @Override 262 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 263 } 264 265 @Override 266 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 267 } 268 } 269 270 private static class TestProcEnv { 271 public final AtomicLong timestamp = new AtomicLong(0); 272 273 public long nextTimestamp() { 274 return timestamp.incrementAndGet(); 275 } 276 } 277}