001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 029import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 030import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 031import org.apache.hadoop.hbase.testclassification.MasterTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.apache.hadoop.hbase.util.Threads; 034import org.junit.jupiter.api.AfterEach; 035import org.junit.jupiter.api.BeforeEach; 036import org.junit.jupiter.api.Tag; 037import org.junit.jupiter.api.Test; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@Tag(MasterTests.TAG) 042@Tag(SmallTests.TAG) 043public class TestProcedureSuspended { 044 045 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureSuspended.class); 046 047 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 048 private static final Procedure NULL_PROC = null; 049 050 private ProcedureExecutor<TestProcEnv> procExecutor; 051 private ProcedureStore procStore; 052 053 private HBaseCommonTestingUtil htu; 054 055 @BeforeEach 056 public void setUp() throws IOException { 057 htu = new HBaseCommonTestingUtil(); 058 059 procStore = new NoopProcedureStore(); 060 procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore); 061 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 062 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 063 } 064 065 @AfterEach 066 public void tearDown() throws IOException { 067 procExecutor.stop(); 068 procStore.stop(false); 069 } 070 071 @Test 072 public void testSuspendWhileHoldingLocks() { 073 final AtomicBoolean lockA = new AtomicBoolean(false); 074 final AtomicBoolean lockB = new AtomicBoolean(false); 075 076 final TestLockProcedure p1keyA = new TestLockProcedure(lockA, "keyA", false, true); 077 final TestLockProcedure p2keyA = new TestLockProcedure(lockA, "keyA", false, true); 078 final TestLockProcedure p3keyB = new TestLockProcedure(lockB, "keyB", false, true); 079 080 procExecutor.submitProcedure(p1keyA); 081 procExecutor.submitProcedure(p2keyA); 082 procExecutor.submitProcedure(p3keyB); 083 084 // first run p1, p3 are able to run p2 is blocked by p1 085 waitAndAssertTimestamp(p1keyA, 1, 1); 086 waitAndAssertTimestamp(p2keyA, 0, -1); 087 waitAndAssertTimestamp(p3keyB, 1, 2); 088 assertTrue(lockA.get()); 089 assertTrue(lockB.get()); 090 091 // release p3 092 p3keyB.setThrowSuspend(false); 093 procExecutor.getScheduler().addFront(p3keyB); 094 waitAndAssertTimestamp(p1keyA, 1, 1); 095 waitAndAssertTimestamp(p2keyA, 0, -1); 096 waitAndAssertTimestamp(p3keyB, 2, 3); 097 assertTrue(lockA.get()); 098 099 // wait until p3 is fully completed 100 ProcedureTestingUtility.waitProcedure(procExecutor, p3keyB); 101 assertFalse(lockB.get()); 102 103 // rollback p2 and wait until is fully completed 104 p1keyA.setTriggerRollback(true); 105 procExecutor.getScheduler().addFront(p1keyA); 106 ProcedureTestingUtility.waitProcedure(procExecutor, p1keyA); 107 108 // p2 should start and suspend 109 waitAndAssertTimestamp(p1keyA, 4, 60000); 110 waitAndAssertTimestamp(p2keyA, 1, 7); 111 waitAndAssertTimestamp(p3keyB, 2, 3); 112 assertTrue(lockA.get()); 113 114 // wait until p2 is fully completed 115 p2keyA.setThrowSuspend(false); 116 procExecutor.getScheduler().addFront(p2keyA); 117 ProcedureTestingUtility.waitProcedure(procExecutor, p2keyA); 118 waitAndAssertTimestamp(p1keyA, 4, 60000); 119 waitAndAssertTimestamp(p2keyA, 2, 8); 120 waitAndAssertTimestamp(p3keyB, 2, 3); 121 assertFalse(lockA.get()); 122 assertFalse(lockB.get()); 123 } 124 125 @Test 126 public void testYieldWhileHoldingLocks() { 127 final AtomicBoolean lock = new AtomicBoolean(false); 128 129 final TestLockProcedure p1 = new TestLockProcedure(lock, "key", true, false); 130 final TestLockProcedure p2 = new TestLockProcedure(lock, "key", true, false); 131 132 procExecutor.submitProcedure(p1); 133 procExecutor.submitProcedure(p2); 134 135 // try to execute a bunch of yield on p1, p2 should be blocked 136 while (p1.getTimestamps().size() < 100) { 137 Threads.sleep(10); 138 } 139 140 assertEquals(0, p2.getTimestamps().size()); 141 142 // wait until p1 is completed 143 p1.setThrowYield(false); 144 ProcedureTestingUtility.waitProcedure(procExecutor, p1); 145 146 // try to execute a bunch of yield on p2 147 while (p2.getTimestamps().size() < 100) { 148 Threads.sleep(10); 149 } 150 151 assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1, 152 p2.getTimestamps().get(0).longValue()); 153 154 // wait until p2 is completed 155 p1.setThrowYield(false); 156 ProcedureTestingUtility.waitProcedure(procExecutor, p1); 157 } 158 159 private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) { 160 final ArrayList<Long> timestamps = proc.getTimestamps(); 161 while (timestamps.size() < size) { 162 Threads.sleep(10); 163 } 164 165 LOG.info("{} -> {}", proc, timestamps); 166 assertEquals(size, timestamps.size()); 167 if (size > 0) { 168 assertEquals(lastTs, timestamps.get(timestamps.size() - 1).longValue()); 169 } 170 } 171 172 public static class TestLockProcedure extends Procedure<TestProcEnv> { 173 private final ArrayList<Long> timestamps = new ArrayList<>(); 174 private final String key; 175 176 private boolean triggerRollback = false; 177 private boolean throwSuspend = false; 178 private boolean throwYield = false; 179 private AtomicBoolean lock = null; 180 private boolean hasLock = false; 181 182 public TestLockProcedure(final AtomicBoolean lock, final String key, final boolean throwYield, 183 final boolean throwSuspend) { 184 this.lock = lock; 185 this.key = key; 186 this.throwYield = throwYield; 187 this.throwSuspend = throwSuspend; 188 } 189 190 public void setThrowYield(final boolean throwYield) { 191 this.throwYield = throwYield; 192 } 193 194 public void setThrowSuspend(final boolean throwSuspend) { 195 this.throwSuspend = throwSuspend; 196 } 197 198 public void setTriggerRollback(final boolean triggerRollback) { 199 this.triggerRollback = triggerRollback; 200 } 201 202 @Override 203 protected Procedure[] execute(final TestProcEnv env) 204 throws ProcedureYieldException, ProcedureSuspendedException { 205 LOG.info("EXECUTE {} suspend {}", this, lock != null); 206 timestamps.add(env.nextTimestamp()); 207 if (triggerRollback) { 208 setFailure(getClass().getSimpleName(), new Exception("injected failure")); 209 } else if (throwYield) { 210 throw new ProcedureYieldException(); 211 } else if (throwSuspend) { 212 throw new ProcedureSuspendedException(); 213 } 214 return null; 215 } 216 217 @Override 218 protected void rollback(final TestProcEnv env) { 219 LOG.info("ROLLBACK {}", this); 220 timestamps.add(env.nextTimestamp() * 10000); 221 } 222 223 @Override 224 protected LockState acquireLock(final TestProcEnv env) { 225 hasLock = lock.compareAndSet(false, true); 226 if (hasLock) { 227 LOG.info("ACQUIRE LOCK {} {}", this, hasLock); 228 return LockState.LOCK_ACQUIRED; 229 } 230 return LockState.LOCK_YIELD_WAIT; 231 } 232 233 @Override 234 protected void releaseLock(final TestProcEnv env) { 235 LOG.info("RELEASE LOCK {} {}", this, hasLock); 236 lock.set(false); 237 } 238 239 @Override 240 protected boolean holdLock(final TestProcEnv env) { 241 return true; 242 } 243 244 public ArrayList<Long> getTimestamps() { 245 return timestamps; 246 } 247 248 @Override 249 protected void toStringClassDetails(StringBuilder builder) { 250 builder.append(getClass().getName()); 251 builder.append("(" + key + ")"); 252 } 253 254 @Override 255 protected boolean abort(TestProcEnv env) { 256 return false; 257 } 258 259 @Override 260 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 261 } 262 263 @Override 264 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 265 } 266 } 267 268 private static class TestProcEnv { 269 public final AtomicLong timestamp = new AtomicLong(0); 270 271 public long nextTimestamp() { 272 return timestamp.incrementAndGet(); 273 } 274 } 275}