001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicLong; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.junit.jupiter.api.AfterEach; 036import org.junit.jupiter.api.BeforeEach; 037import org.junit.jupiter.api.Tag; 038import org.junit.jupiter.api.Test; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042@Tag(MasterTests.TAG) 043@Tag(SmallTests.TAG) 044public class TestYieldProcedures { 045 046 private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class); 047 048 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 049 private static final Procedure NULL_PROC = null; 050 051 private ProcedureExecutor<TestProcEnv> procExecutor; 052 private TestScheduler procRunnables; 053 private ProcedureStore procStore; 054 055 private HBaseCommonTestingUtil htu; 056 private FileSystem fs; 057 private Path testDir; 058 private Path logDir; 059 060 @BeforeEach 061 public void setUp() throws IOException { 062 htu = new HBaseCommonTestingUtil(); 063 testDir = htu.getDataTestDir(); 064 fs = testDir.getFileSystem(htu.getConfiguration()); 065 assertTrue(testDir.depth() > 1); 066 067 logDir = new Path(testDir, "proc-logs"); 068 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 069 procRunnables = new TestScheduler(); 070 procExecutor = 071 new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); 072 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 073 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); 074 } 075 076 @AfterEach 077 public void tearDown() throws IOException { 078 procExecutor.stop(); 079 procStore.stop(false); 080 fs.delete(logDir, true); 081 } 082 083 @Test 084 public void testYieldEachExecutionStep() throws Exception { 085 final int NUM_STATES = 3; 086 087 TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3]; 088 for (int i = 0; i < procs.length; ++i) { 089 procs[i] = new TestStateMachineProcedure(true, false); 090 procExecutor.submitProcedure(procs[i]); 091 } 092 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 093 094 for (int i = 0; i < procs.length; ++i) { 095 assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size()); 096 097 // verify execution 098 int index = 0; 099 for (int execStep = 0; execStep < NUM_STATES; ++execStep) { 100 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++); 101 assertFalse(info.isRollback()); 102 assertEquals(execStep, info.getStep().ordinal()); 103 } 104 105 // verify rollback 106 for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) { 107 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++); 108 assertTrue(info.isRollback()); 109 assertEquals(execStep, info.getStep().ordinal()); 110 } 111 } 112 113 // check runnable queue stats 114 assertEquals(0, procRunnables.size()); 115 assertEquals(0, procRunnables.addFrontCalls); 116 assertEquals(15, procRunnables.addBackCalls); 117 assertEquals(12, procRunnables.yieldCalls); 118 assertEquals(16, procRunnables.pollCalls); 119 assertEquals(3, procRunnables.completionCalls); 120 } 121 122 @Test 123 public void testYieldOnInterrupt() throws Exception { 124 final int NUM_STATES = 3; 125 int count = 0; 126 127 TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true); 128 ProcedureTestingUtility.submitAndWait(procExecutor, proc); 129 130 // test execute (we execute steps twice, one has the IE the other completes) 131 assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size()); 132 for (int i = 0; i < NUM_STATES; ++i) { 133 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 134 assertFalse(info.isRollback()); 135 assertEquals(i, info.getStep().ordinal()); 136 137 info = proc.getExecutionInfo().get(count++); 138 assertFalse(info.isRollback()); 139 assertEquals(i, info.getStep().ordinal()); 140 } 141 142 // test rollback (we execute steps twice, rollback counts both IE and completed) 143 for (int i = NUM_STATES - 1; i >= 0; --i) { 144 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 145 assertTrue(info.isRollback()); 146 assertEquals(i, info.getStep().ordinal()); 147 } 148 149 for (int i = NUM_STATES - 1; i >= 0; --i) { 150 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 151 assertEquals(true, info.isRollback()); 152 assertEquals(0, info.getStep().ordinal()); 153 } 154 155 // check runnable queue stats 156 assertEquals(0, procRunnables.size()); 157 assertEquals(0, procRunnables.addFrontCalls); 158 assertEquals(11, procRunnables.addBackCalls); 159 assertEquals(10, procRunnables.yieldCalls); 160 assertEquals(12, procRunnables.pollCalls); 161 assertEquals(1, procRunnables.completionCalls); 162 } 163 164 @Test 165 public void testYieldException() { 166 TestYieldProcedure proc = new TestYieldProcedure(); 167 ProcedureTestingUtility.submitAndWait(procExecutor, proc); 168 assertEquals(6, proc.step); 169 170 // check runnable queue stats 171 assertEquals(0, procRunnables.size()); 172 assertEquals(0, procRunnables.addFrontCalls); 173 assertEquals(6, procRunnables.addBackCalls); 174 assertEquals(5, procRunnables.yieldCalls); 175 assertEquals(7, procRunnables.pollCalls); 176 assertEquals(1, procRunnables.completionCalls); 177 } 178 179 private static class TestProcEnv { 180 public final AtomicLong timestamp = new AtomicLong(0); 181 182 public long nextTimestamp() { 183 return timestamp.incrementAndGet(); 184 } 185 } 186 187 public static class TestStateMachineProcedure 188 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 189 enum State { 190 STATE_1, 191 STATE_2, 192 STATE_3 193 } 194 195 public static class ExecutionInfo { 196 private final boolean rollback; 197 private final long timestamp; 198 private final State step; 199 200 public ExecutionInfo(long timestamp, State step, boolean isRollback) { 201 this.timestamp = timestamp; 202 this.step = step; 203 this.rollback = isRollback; 204 } 205 206 public State getStep() { 207 return step; 208 } 209 210 public long getTimestamp() { 211 return timestamp; 212 } 213 214 public boolean isRollback() { 215 return rollback; 216 } 217 } 218 219 private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>(); 220 private final AtomicBoolean aborted = new AtomicBoolean(false); 221 private final boolean throwInterruptOnceOnEachStep; 222 private final boolean abortOnFinalStep; 223 224 public TestStateMachineProcedure() { 225 this(false, false); 226 } 227 228 public TestStateMachineProcedure(boolean abortOnFinalStep, 229 boolean throwInterruptOnceOnEachStep) { 230 this.abortOnFinalStep = abortOnFinalStep; 231 this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep; 232 } 233 234 public ArrayList<ExecutionInfo> getExecutionInfo() { 235 return executionInfo; 236 } 237 238 @Override 239 protected boolean isRollbackSupported(State state) { 240 return true; 241 } 242 243 @Override 244 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) 245 throws InterruptedException { 246 final long ts = env.nextTimestamp(); 247 LOG.info(getProcId() + " execute step " + state + " ts=" + ts); 248 executionInfo.add(new ExecutionInfo(ts, state, false)); 249 Thread.sleep(150); 250 251 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) { 252 LOG.debug("THROW INTERRUPT"); 253 throw new InterruptedException("test interrupt"); 254 } 255 256 switch (state) { 257 case STATE_1: 258 setNextState(State.STATE_2); 259 break; 260 case STATE_2: 261 setNextState(State.STATE_3); 262 break; 263 case STATE_3: 264 if (abortOnFinalStep) { 265 setFailure("test", new IOException("Requested abort on final step")); 266 } 267 return Flow.NO_MORE_STATE; 268 default: 269 throw new UnsupportedOperationException(); 270 } 271 return Flow.HAS_MORE_STATE; 272 } 273 274 @Override 275 protected void rollbackState(TestProcEnv env, final State state) throws InterruptedException { 276 final long ts = env.nextTimestamp(); 277 LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts); 278 executionInfo.add(new ExecutionInfo(ts, state, true)); 279 Thread.sleep(150); 280 281 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) { 282 LOG.debug("THROW INTERRUPT"); 283 throw new InterruptedException("test interrupt"); 284 } 285 286 switch (state) { 287 case STATE_1: 288 break; 289 case STATE_2: 290 break; 291 case STATE_3: 292 break; 293 default: 294 throw new UnsupportedOperationException(); 295 } 296 } 297 298 @Override 299 protected State getState(final int stateId) { 300 return State.values()[stateId]; 301 } 302 303 @Override 304 protected int getStateId(final State state) { 305 return state.ordinal(); 306 } 307 308 @Override 309 protected State getInitialState() { 310 return State.STATE_1; 311 } 312 313 @Override 314 protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) { 315 return true; 316 } 317 318 @Override 319 protected boolean abort(TestProcEnv env) { 320 aborted.set(true); 321 return true; 322 } 323 } 324 325 public static class TestYieldProcedure extends Procedure<TestProcEnv> { 326 private int step = 0; 327 328 public TestYieldProcedure() { 329 } 330 331 @Override 332 protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException { 333 LOG.info("execute step " + step); 334 if (step++ < 5) { 335 throw new ProcedureYieldException(); 336 } 337 return null; 338 } 339 340 @Override 341 protected void rollback(TestProcEnv env) { 342 } 343 344 @Override 345 protected boolean abort(TestProcEnv env) { 346 return false; 347 } 348 349 @Override 350 protected boolean isYieldAfterExecutionStep(final TestProcEnv env) { 351 return true; 352 } 353 354 @Override 355 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 356 } 357 358 @Override 359 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 360 } 361 } 362 363 private static class TestScheduler extends SimpleProcedureScheduler { 364 private int completionCalls; 365 private int addFrontCalls; 366 private int addBackCalls; 367 private int yieldCalls; 368 private int pollCalls; 369 370 public TestScheduler() { 371 } 372 373 @Override 374 public void addFront(final Procedure proc) { 375 addFrontCalls++; 376 super.addFront(proc); 377 } 378 379 @Override 380 public void addBack(final Procedure proc) { 381 addBackCalls++; 382 super.addBack(proc); 383 } 384 385 @Override 386 public void yield(final Procedure proc) { 387 yieldCalls++; 388 super.yield(proc); 389 } 390 391 @Override 392 public Procedure poll() { 393 pollCalls++; 394 return super.poll(); 395 } 396 397 @Override 398 public Procedure poll(long timeout, TimeUnit unit) { 399 pollCalls++; 400 return super.poll(timeout, unit); 401 } 402 403 @Override 404 public void completionCleanup(Procedure proc) { 405 completionCalls++; 406 } 407 } 408}