001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseCommonTestingUtility; 032import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.junit.After; 036import org.junit.Before; 037import org.junit.ClassRule; 038import org.junit.Test; 039import org.junit.experimental.categories.Category; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043@Category({MasterTests.class, SmallTests.class}) 044public class TestYieldProcedures { 045 046 @ClassRule 047 public static final HBaseClassTestRule CLASS_RULE = 048 HBaseClassTestRule.forClass(TestYieldProcedures.class); 049 050 private static final Logger LOG = LoggerFactory.getLogger(TestYieldProcedures.class); 051 052 private static final int PROCEDURE_EXECUTOR_SLOTS = 1; 053 private static final Procedure NULL_PROC = null; 054 055 private ProcedureExecutor<TestProcEnv> procExecutor; 056 private TestScheduler procRunnables; 057 private ProcedureStore procStore; 058 059 private HBaseCommonTestingUtility htu; 060 private FileSystem fs; 061 private Path testDir; 062 private Path logDir; 063 064 @Before 065 public void setUp() throws IOException { 066 htu = new HBaseCommonTestingUtility(); 067 testDir = htu.getDataTestDir(); 068 fs = testDir.getFileSystem(htu.getConfiguration()); 069 assertTrue(testDir.depth() > 1); 070 071 logDir = new Path(testDir, "proc-logs"); 072 procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); 073 procRunnables = new TestScheduler(); 074 procExecutor = 075 new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); 076 procStore.start(PROCEDURE_EXECUTOR_SLOTS); 077 ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, 078 true); 079 } 080 081 @After 082 public void tearDown() throws IOException { 083 procExecutor.stop(); 084 procStore.stop(false); 085 fs.delete(logDir, true); 086 } 087 088 @Test 089 public void testYieldEachExecutionStep() throws Exception { 090 final int NUM_STATES = 3; 091 092 TestStateMachineProcedure[] procs = new TestStateMachineProcedure[3]; 093 for (int i = 0; i < procs.length; ++i) { 094 procs[i] = new TestStateMachineProcedure(true, false); 095 procExecutor.submitProcedure(procs[i]); 096 } 097 ProcedureTestingUtility.waitNoProcedureRunning(procExecutor); 098 099 for (int i = 0; i < procs.length; ++i) { 100 assertEquals(NUM_STATES * 2, procs[i].getExecutionInfo().size()); 101 102 // verify execution 103 int index = 0; 104 for (int execStep = 0; execStep < NUM_STATES; ++execStep) { 105 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++); 106 assertEquals(false, info.isRollback()); 107 assertEquals(execStep, info.getStep().ordinal()); 108 } 109 110 // verify rollback 111 for (int execStep = NUM_STATES - 1; execStep >= 0; --execStep) { 112 TestStateMachineProcedure.ExecutionInfo info = procs[i].getExecutionInfo().get(index++); 113 assertEquals(true, info.isRollback()); 114 assertEquals(execStep, info.getStep().ordinal()); 115 } 116 } 117 118 // check runnable queue stats 119 assertEquals(0, procRunnables.size()); 120 assertEquals(0, procRunnables.addFrontCalls); 121 assertEquals(15, procRunnables.addBackCalls); 122 assertEquals(12, procRunnables.yieldCalls); 123 assertEquals(16, procRunnables.pollCalls); 124 assertEquals(3, procRunnables.completionCalls); 125 } 126 127 @Test 128 public void testYieldOnInterrupt() throws Exception { 129 final int NUM_STATES = 3; 130 int count = 0; 131 132 TestStateMachineProcedure proc = new TestStateMachineProcedure(true, true); 133 ProcedureTestingUtility.submitAndWait(procExecutor, proc); 134 135 // test execute (we execute steps twice, one has the IE the other completes) 136 assertEquals(NUM_STATES * 4, proc.getExecutionInfo().size()); 137 for (int i = 0; i < NUM_STATES; ++i) { 138 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 139 assertEquals(false, info.isRollback()); 140 assertEquals(i, info.getStep().ordinal()); 141 142 info = proc.getExecutionInfo().get(count++); 143 assertEquals(false, info.isRollback()); 144 assertEquals(i, info.getStep().ordinal()); 145 } 146 147 // test rollback (we execute steps twice, rollback counts both IE and completed) 148 for (int i = NUM_STATES - 1; i >= 0; --i) { 149 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 150 assertEquals(true, info.isRollback()); 151 assertEquals(i, info.getStep().ordinal()); 152 } 153 154 for (int i = NUM_STATES - 1; i >= 0; --i) { 155 TestStateMachineProcedure.ExecutionInfo info = proc.getExecutionInfo().get(count++); 156 assertEquals(true, info.isRollback()); 157 assertEquals(0, info.getStep().ordinal()); 158 } 159 160 // check runnable queue stats 161 assertEquals(0, procRunnables.size()); 162 assertEquals(0, procRunnables.addFrontCalls); 163 assertEquals(11, procRunnables.addBackCalls); 164 assertEquals(10, procRunnables.yieldCalls); 165 assertEquals(12, procRunnables.pollCalls); 166 assertEquals(1, procRunnables.completionCalls); 167 } 168 169 @Test 170 public void testYieldException() { 171 TestYieldProcedure proc = new TestYieldProcedure(); 172 ProcedureTestingUtility.submitAndWait(procExecutor, proc); 173 assertEquals(6, proc.step); 174 175 // check runnable queue stats 176 assertEquals(0, procRunnables.size()); 177 assertEquals(0, procRunnables.addFrontCalls); 178 assertEquals(6, procRunnables.addBackCalls); 179 assertEquals(5, procRunnables.yieldCalls); 180 assertEquals(7, procRunnables.pollCalls); 181 assertEquals(1, procRunnables.completionCalls); 182 } 183 184 private static class TestProcEnv { 185 public final AtomicLong timestamp = new AtomicLong(0); 186 187 public long nextTimestamp() { 188 return timestamp.incrementAndGet(); 189 } 190 } 191 192 public static class TestStateMachineProcedure 193 extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> { 194 enum State { STATE_1, STATE_2, STATE_3 } 195 196 public static class ExecutionInfo { 197 private final boolean rollback; 198 private final long timestamp; 199 private final State step; 200 201 public ExecutionInfo(long timestamp, State step, boolean isRollback) { 202 this.timestamp = timestamp; 203 this.step = step; 204 this.rollback = isRollback; 205 } 206 207 public State getStep() { return step; } 208 public long getTimestamp() { return timestamp; } 209 public boolean isRollback() { return rollback; } 210 } 211 212 private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>(); 213 private final AtomicBoolean aborted = new AtomicBoolean(false); 214 private final boolean throwInterruptOnceOnEachStep; 215 private final boolean abortOnFinalStep; 216 217 public TestStateMachineProcedure() { 218 this(false, false); 219 } 220 221 public TestStateMachineProcedure(boolean abortOnFinalStep, 222 boolean throwInterruptOnceOnEachStep) { 223 this.abortOnFinalStep = abortOnFinalStep; 224 this.throwInterruptOnceOnEachStep = throwInterruptOnceOnEachStep; 225 } 226 227 public ArrayList<ExecutionInfo> getExecutionInfo() { 228 return executionInfo; 229 } 230 231 @Override 232 protected StateMachineProcedure.Flow executeFromState(TestProcEnv env, State state) 233 throws InterruptedException { 234 final long ts = env.nextTimestamp(); 235 LOG.info(getProcId() + " execute step " + state + " ts=" + ts); 236 executionInfo.add(new ExecutionInfo(ts, state, false)); 237 Thread.sleep(150); 238 239 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) { 240 LOG.debug("THROW INTERRUPT"); 241 throw new InterruptedException("test interrupt"); 242 } 243 244 switch (state) { 245 case STATE_1: 246 setNextState(State.STATE_2); 247 break; 248 case STATE_2: 249 setNextState(State.STATE_3); 250 break; 251 case STATE_3: 252 if (abortOnFinalStep) { 253 setFailure("test", new IOException("Requested abort on final step")); 254 } 255 return Flow.NO_MORE_STATE; 256 default: 257 throw new UnsupportedOperationException(); 258 } 259 return Flow.HAS_MORE_STATE; 260 } 261 262 @Override 263 protected void rollbackState(TestProcEnv env, final State state) 264 throws InterruptedException { 265 final long ts = env.nextTimestamp(); 266 LOG.debug(getProcId() + " rollback state " + state + " ts=" + ts); 267 executionInfo.add(new ExecutionInfo(ts, state, true)); 268 Thread.sleep(150); 269 270 if (throwInterruptOnceOnEachStep && ((executionInfo.size() - 1) % 2) == 0) { 271 LOG.debug("THROW INTERRUPT"); 272 throw new InterruptedException("test interrupt"); 273 } 274 275 switch (state) { 276 case STATE_1: 277 break; 278 case STATE_2: 279 break; 280 case STATE_3: 281 break; 282 default: 283 throw new UnsupportedOperationException(); 284 } 285 } 286 287 @Override 288 protected State getState(final int stateId) { 289 return State.values()[stateId]; 290 } 291 292 @Override 293 protected int getStateId(final State state) { 294 return state.ordinal(); 295 } 296 297 @Override 298 protected State getInitialState() { 299 return State.STATE_1; 300 } 301 302 @Override 303 protected boolean isYieldBeforeExecuteFromState(TestProcEnv env, State state) { 304 return true; 305 } 306 307 @Override 308 protected boolean abort(TestProcEnv env) { 309 aborted.set(true); 310 return true; 311 } 312 } 313 314 public static class TestYieldProcedure extends Procedure<TestProcEnv> { 315 private int step = 0; 316 317 public TestYieldProcedure() { 318 } 319 320 @Override 321 protected Procedure[] execute(final TestProcEnv env) throws ProcedureYieldException { 322 LOG.info("execute step " + step); 323 if (step++ < 5) { 324 throw new ProcedureYieldException(); 325 } 326 return null; 327 } 328 329 @Override 330 protected void rollback(TestProcEnv env) { 331 } 332 333 @Override 334 protected boolean abort(TestProcEnv env) { 335 return false; 336 } 337 338 @Override 339 protected boolean isYieldAfterExecutionStep(final TestProcEnv env) { 340 return true; 341 } 342 343 @Override 344 protected void serializeStateData(ProcedureStateSerializer serializer) 345 throws IOException { 346 } 347 348 @Override 349 protected void deserializeStateData(ProcedureStateSerializer serializer) 350 throws IOException { 351 } 352 } 353 354 private static class TestScheduler extends SimpleProcedureScheduler { 355 private int completionCalls; 356 private int addFrontCalls; 357 private int addBackCalls; 358 private int yieldCalls; 359 private int pollCalls; 360 361 public TestScheduler() {} 362 363 @Override 364 public void addFront(final Procedure proc) { 365 addFrontCalls++; 366 super.addFront(proc); 367 } 368 369 @Override 370 public void addBack(final Procedure proc) { 371 addBackCalls++; 372 super.addBack(proc); 373 } 374 375 @Override 376 public void yield(final Procedure proc) { 377 yieldCalls++; 378 super.yield(proc); 379 } 380 381 @Override 382 public Procedure poll() { 383 pollCalls++; 384 return super.poll(); 385 } 386 387 @Override 388 public Procedure poll(long timeout, TimeUnit unit) { 389 pollCalls++; 390 return super.poll(timeout, unit); 391 } 392 393 @Override 394 public Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit) { 395 pollCalls++; 396 return super.poll(onlyUrgent, timeout, unit); 397 } 398 399 @Override 400 public void completionCleanup(Procedure proc) { 401 completionCalls++; 402 } 403 } 404}