001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.Map; 024import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 025import org.apache.hadoop.hbase.metrics.Counter; 026import org.apache.hadoop.hbase.metrics.Histogram; 027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 028import org.apache.hadoop.hbase.procedure2.util.StringUtils; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.NonceKey; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 037 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 039 040/** 041 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate, 042 * stack-indexes, etc. 043 * <p/> 044 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the 045 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may 046 * be called multiple times in the case of failure or a restart, so code must be idempotent. The 047 * return from an execute call is either: null to indicate we are done; ourself if there is more to 048 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes 049 * our execution. 050 * <p/> 051 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps 052 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the 053 * ProcedureState enum from protos: 054 * <ul> 055 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may 056 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to 057 * ROLLEDBACK state.</li> 058 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li> 059 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only 060 * condition when scheduler/ executor will drop procedure from further processing is when procedure 061 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li> 062 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states 063 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li> 064 * </ul> 065 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their 066 * own state. This can lead to confusion. Try to keep the two distinct. 067 * <p/> 068 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback 069 * step is supposed to cleanup the resources created during the execute() step. In case of failure 070 * and restart, rollback() may be called multiple times, so again the code must be idempotent. 071 * <p/> 072 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an 073 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be 074 * locked for the life of a procedure -- not just the calls to execute -- then implementations 075 * should say so with the {@link #holdLock(Object)} method. 076 * <p/> 077 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the 078 * implementation is a bit tricky so we add some comments hrre about it. 079 * <ul> 080 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record 081 * whether we have the lock. We will set it to {@code true} in 082 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in 083 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any 084 * more.</li> 085 * <li>Also added a locked field in the proto message. When storing, the field will be set according 086 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure 087 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto 088 * message is {@code true}.</li> 089 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling 090 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures 091 * need to wait until master is initialized. So the solution here is that, we introduced a new 092 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized 093 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method 094 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the 095 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later 096 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the 097 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is 098 * {@code true}, when we just set the {@link #locked} field to true and return, without actually 099 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li> 100 * </ul> 101 * <p/> 102 * Procedures can be suspended or put in wait state with a callback that gets executed on 103 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and 104 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure 105 * class for an example usage. 106 * </p> 107 * <p/> 108 * There are hooks for collecting metrics on submit of the procedure and on finish. See 109 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}. 110 */ 111@InterfaceAudience.Private 112public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> { 113 private static final Logger LOG = LoggerFactory.getLogger(Procedure.class); 114 public static final long NO_PROC_ID = -1; 115 protected static final int NO_TIMEOUT = -1; 116 117 public enum LockState { 118 LOCK_ACQUIRED, // Lock acquired and ready to execute 119 LOCK_YIELD_WAIT, // Lock not acquired, framework needs to yield 120 LOCK_EVENT_WAIT, // Lock not acquired, an event will yield the procedure 121 } 122 123 // Unchanged after initialization 124 private NonceKey nonceKey = null; 125 private String owner = null; 126 private long parentProcId = NO_PROC_ID; 127 private long rootProcId = NO_PROC_ID; 128 private long procId = NO_PROC_ID; 129 private long submittedTime; 130 131 // Runtime state, updated every operation 132 private ProcedureState state = ProcedureState.INITIALIZING; 133 private RemoteProcedureException exception = null; 134 private int[] stackIndexes = null; 135 private int childrenLatch = 0; 136 137 private volatile int timeout = NO_TIMEOUT; 138 private volatile long lastUpdate; 139 140 private volatile byte[] result = null; 141 142 private volatile boolean locked = false; 143 144 private boolean lockedWhenLoading = false; 145 146 /** 147 * Used for override complete of the procedure without actually doing any logic in the procedure. 148 * If bypass is set to true, when executing it will return null when 149 * {@link #doExecute(Object)} is called to finish the procedure and release any locks 150 * it may currently hold. The bypass does cleanup around the Procedure as far as the 151 * Procedure framework is concerned. It does not clean any internal state that the 152 * Procedure's themselves may have set. That is for the Procedures to do themselves 153 * when bypass is called. They should override bypass and do their cleanup in the 154 * overridden bypass method (be sure to call the parent bypass to ensure proper 155 * processing). 156 * <p></p>Bypassing a procedure is not like aborting. Aborting a procedure will trigger 157 * a rollback. And since the {@link #abort(Object)} method is overrideable 158 * Some procedures may have chosen to ignore the aborting. 159 */ 160 private volatile boolean bypass = false; 161 162 /** 163 * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to 164 * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the 165 * persistence of the procedure. 166 * <p/> 167 * This is useful when the procedure is in error and you want to retry later. The retry interval 168 * and the number of retries are usually not critical so skip the persistence can save some 169 * resources, and also speed up the restart processing. 170 * <p/> 171 * Notice that this value will be reset to true every time before execution. And when rolling back 172 * we do not test this value. 173 */ 174 private boolean persist = true; 175 176 public boolean isBypass() { 177 return bypass; 178 } 179 180 /** 181 * Set the bypass to true. 182 * Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now. 183 * DO NOT use this method alone, since we can't just bypass one single procedure. We need to 184 * bypass its ancestor too. If your Procedure has set state, it needs to undo it in here. 185 * @param env Current environment. May be null because of context; e.g. pretty-printing 186 * procedure WALs where there is no 'environment' (and where Procedures that require 187 * an 'environment' won't be run. 188 */ 189 protected void bypass(TEnvironment env) { 190 this.bypass = true; 191 } 192 193 boolean needPersistence() { 194 return persist; 195 } 196 197 void resetPersistence() { 198 persist = true; 199 } 200 201 protected final void skipPersistence() { 202 persist = false; 203 } 204 205 /** 206 * The main code of the procedure. It must be idempotent since execute() 207 * may be called multiple times in case of machine failure in the middle 208 * of the execution. 209 * @param env the environment passed to the ProcedureExecutor 210 * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the 211 * procedure is done. 212 * @throws ProcedureYieldException the procedure will be added back to the queue and retried 213 * later. 214 * @throws InterruptedException the procedure will be added back to the queue and retried later. 215 * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself 216 * and has set itself up waiting for an external event to wake it back up again. 217 */ 218 protected abstract Procedure<TEnvironment>[] execute(TEnvironment env) 219 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException; 220 221 /** 222 * The code to undo what was done by the execute() code. 223 * It is called when the procedure or one of the sub-procedures failed or an 224 * abort was requested. It should cleanup all the resources created by 225 * the execute() call. The implementation must be idempotent since rollback() 226 * may be called multiple time in case of machine failure in the middle 227 * of the execution. 228 * @param env the environment passed to the ProcedureExecutor 229 * @throws IOException temporary failure, the rollback will retry later 230 * @throws InterruptedException the procedure will be added back to the queue and retried later 231 */ 232 protected abstract void rollback(TEnvironment env) 233 throws IOException, InterruptedException; 234 235 /** 236 * The abort() call is asynchronous and each procedure must decide how to deal 237 * with it, if they want to be abortable. The simplest implementation 238 * is to have an AtomicBoolean set in the abort() method and then the execute() 239 * will check if the abort flag is set or not. 240 * abort() may be called multiple times from the client, so the implementation 241 * must be idempotent. 242 * 243 * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification 244 * that allows the procedure implementor abort. 245 */ 246 protected abstract boolean abort(TEnvironment env); 247 248 /** 249 * The user-level code of the procedure may have some state to 250 * persist (e.g. input arguments or current position in the processing state) to 251 * be able to resume on failure. 252 * @param serializer stores the serializable state 253 */ 254 protected abstract void serializeStateData(ProcedureStateSerializer serializer) 255 throws IOException; 256 257 /** 258 * Called on store load to allow the user to decode the previously serialized 259 * state. 260 * @param serializer contains the serialized state 261 */ 262 protected abstract void deserializeStateData(ProcedureStateSerializer serializer) 263 throws IOException; 264 265 /** 266 * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will 267 * call us to determine whether we need to wait for initialization, second, it will call 268 * {@link #acquireLock(Object)} to actually handle the lock for this procedure. 269 * <p/> 270 * This is because that when master restarts, we need to restore the lock state for all the 271 * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the 272 * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part 273 * of the initialization!), so we need to split the code into two steps, and when restore, we just 274 * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock. 275 * @return true means we need to wait until the environment has been initialized, otherwise true. 276 */ 277 protected boolean waitInitialized(TEnvironment env) { 278 return false; 279 } 280 281 /** 282 * The user should override this method if they need a lock on an Entity. A lock can be anything, 283 * and it is up to the implementor. The Procedure Framework will call this method just before it 284 * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to 285 * execute. 286 * <p/> 287 * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other 288 * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}. 289 * <p/> 290 * Example: in our Master we can execute request in parallel for different tables. We can create 291 * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is 292 * queued waiting that specific table create to happen. 293 * <p/> 294 * There are 3 LockState: 295 * <ul> 296 * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to 297 * execute.</li> 298 * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should 299 * take care of readding the procedure back to the runnable set for retry</li> 300 * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take 301 * care of readding the procedure back to the runnable set when the lock is available.</li> 302 * </ul> 303 * @return the lock state as described above. 304 */ 305 protected LockState acquireLock(TEnvironment env) { 306 return LockState.LOCK_ACQUIRED; 307 } 308 309 /** 310 * The user should override this method, and release lock if necessary. 311 */ 312 protected void releaseLock(TEnvironment env) { 313 // no-op 314 } 315 316 /** 317 * Used to keep the procedure lock even when the procedure is yielding or suspended. 318 * @return true if the procedure should hold on the lock until completionCleanup() 319 */ 320 protected boolean holdLock(TEnvironment env) { 321 return false; 322 } 323 324 /** 325 * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)} 326 * returns true, the procedure executor will call acquireLock() once and thereafter 327 * not call {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls 328 * release/acquire around each invocation of {@link #execute(Object)}. 329 * @see #holdLock(Object) 330 * @return true if the procedure has the lock, false otherwise. 331 */ 332 public final boolean hasLock() { 333 return locked; 334 } 335 336 /** 337 * Called when the procedure is loaded for replay. 338 * The procedure implementor may use this method to perform some quick 339 * operation before replay. 340 * e.g. failing the procedure if the state on replay may be unknown. 341 */ 342 protected void beforeReplay(TEnvironment env) { 343 // no-op 344 } 345 346 /** 347 * Called when the procedure is ready to be added to the queue after 348 * the loading/replay operation. 349 */ 350 protected void afterReplay(TEnvironment env) { 351 // no-op 352 } 353 354 /** 355 * Called when the procedure is marked as completed (success or rollback). 356 * The procedure implementor may use this method to cleanup in-memory states. 357 * This operation will not be retried on failure. If a procedure took a lock, 358 * it will have been released when this method runs. 359 */ 360 protected void completionCleanup(TEnvironment env) { 361 // no-op 362 } 363 364 /** 365 * By default, the procedure framework/executor will try to run procedures start to finish. 366 * Return true to make the executor yield between each execution step to 367 * give other procedures a chance to run. 368 * @param env the environment passed to the ProcedureExecutor 369 * @return Return true if the executor should yield on completion of an execution step. 370 * Defaults to return false. 371 */ 372 protected boolean isYieldAfterExecutionStep(TEnvironment env) { 373 return false; 374 } 375 376 /** 377 * By default, the executor will keep the procedure result around util 378 * the eviction TTL is expired. The client can cut down the waiting time 379 * by requesting that the result is removed from the executor. 380 * In case of system started procedure, we can force the executor to auto-ack. 381 * @param env the environment passed to the ProcedureExecutor 382 * @return true if the executor should wait the client ack for the result. 383 * Defaults to return true. 384 */ 385 protected boolean shouldWaitClientAck(TEnvironment env) { 386 return true; 387 } 388 389 /** 390 * Override this method to provide procedure specific counters for submitted count, failed 391 * count and time histogram. 392 * @param env The environment passed to the procedure executor 393 * @return Container object for procedure related metric 394 */ 395 protected ProcedureMetrics getProcedureMetrics(TEnvironment env) { 396 return null; 397 } 398 399 /** 400 * This function will be called just when procedure is submitted for execution. Override this 401 * method to update the metrics at the beginning of the procedure. The default implementation 402 * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null 403 * {@link ProcedureMetrics}. 404 */ 405 protected void updateMetricsOnSubmit(TEnvironment env) { 406 ProcedureMetrics metrics = getProcedureMetrics(env); 407 if (metrics == null) { 408 return; 409 } 410 411 Counter submittedCounter = metrics.getSubmittedCounter(); 412 if (submittedCounter != null) { 413 submittedCounter.increment(); 414 } 415 } 416 417 /** 418 * This function will be called just after procedure execution is finished. Override this method 419 * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns 420 * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a 421 * time histogram for successfully completed procedures. Increments failed counter for failed 422 * procedures. 423 * <p/> 424 * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including 425 * successfully finished siblings, this function may get called twice in certain cases for certain 426 * procedures. Explore further if this can be called once. 427 * @param env The environment passed to the procedure executor 428 * @param runtime Runtime of the procedure in milliseconds 429 * @param success true if procedure is completed successfully 430 */ 431 protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) { 432 ProcedureMetrics metrics = getProcedureMetrics(env); 433 if (metrics == null) { 434 return; 435 } 436 437 if (success) { 438 Histogram timeHisto = metrics.getTimeHisto(); 439 if (timeHisto != null) { 440 timeHisto.update(runtime); 441 } 442 } else { 443 Counter failedCounter = metrics.getFailedCounter(); 444 if (failedCounter != null) { 445 failedCounter.increment(); 446 } 447 } 448 } 449 450 @Override 451 public String toString() { 452 // Return the simple String presentation of the procedure. 453 return toStringSimpleSB().toString(); 454 } 455 456 /** 457 * Build the StringBuilder for the simple form of procedure string. 458 * @return the StringBuilder 459 */ 460 protected StringBuilder toStringSimpleSB() { 461 final StringBuilder sb = new StringBuilder(); 462 463 sb.append("pid="); 464 sb.append(getProcId()); 465 466 if (hasParent()) { 467 sb.append(", ppid="); 468 sb.append(getParentProcId()); 469 } 470 471 /* 472 * TODO 473 * Enable later when this is being used. 474 * Currently owner not used. 475 if (hasOwner()) { 476 sb.append(", owner="); 477 sb.append(getOwner()); 478 }*/ 479 480 sb.append(", state="); // pState for Procedure State as opposed to any other kind. 481 toStringState(sb); 482 483 // Only print out locked if actually locked. Most of the time it is not. 484 if (this.locked) { 485 sb.append(", locked=").append(locked); 486 } 487 488 if (bypass) { 489 sb.append(", bypass=").append(bypass); 490 } 491 492 if (hasException()) { 493 sb.append(", exception=" + getException()); 494 } 495 496 sb.append("; "); 497 toStringClassDetails(sb); 498 499 return sb; 500 } 501 502 /** 503 * Extend the toString() information with more procedure details 504 */ 505 public String toStringDetails() { 506 final StringBuilder sb = toStringSimpleSB(); 507 508 sb.append(" submittedTime="); 509 sb.append(getSubmittedTime()); 510 511 sb.append(", lastUpdate="); 512 sb.append(getLastUpdate()); 513 514 final int[] stackIndices = getStackIndexes(); 515 if (stackIndices != null) { 516 sb.append("\n"); 517 sb.append("stackIndexes="); 518 sb.append(Arrays.toString(stackIndices)); 519 } 520 521 return sb.toString(); 522 } 523 524 protected String toStringClass() { 525 StringBuilder sb = new StringBuilder(); 526 toStringClassDetails(sb); 527 return sb.toString(); 528 } 529 530 /** 531 * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating 532 * generic Procedure State with Procedure particulars. 533 * @param builder Append current {@link ProcedureState} 534 */ 535 protected void toStringState(StringBuilder builder) { 536 builder.append(getState()); 537 } 538 539 /** 540 * Extend the toString() information with the procedure details 541 * e.g. className and parameters 542 * @param builder the string builder to use to append the proc specific information 543 */ 544 protected void toStringClassDetails(StringBuilder builder) { 545 builder.append(getClass().getName()); 546 } 547 548 // ========================================================================== 549 // Those fields are unchanged after initialization. 550 // 551 // Each procedure will get created from the user or during 552 // ProcedureExecutor.start() during the load() phase and then submitted 553 // to the executor. these fields will never be changed after initialization 554 // ========================================================================== 555 public long getProcId() { 556 return procId; 557 } 558 559 public boolean hasParent() { 560 return parentProcId != NO_PROC_ID; 561 } 562 563 public long getParentProcId() { 564 return parentProcId; 565 } 566 567 public long getRootProcId() { 568 return rootProcId; 569 } 570 571 public String getProcName() { 572 return toStringClass(); 573 } 574 575 public NonceKey getNonceKey() { 576 return nonceKey; 577 } 578 579 public long getSubmittedTime() { 580 return submittedTime; 581 } 582 583 public String getOwner() { 584 return owner; 585 } 586 587 public boolean hasOwner() { 588 return owner != null; 589 } 590 591 /** 592 * Called by the ProcedureExecutor to assign the ID to the newly created procedure. 593 */ 594 @VisibleForTesting 595 protected void setProcId(long procId) { 596 this.procId = procId; 597 this.submittedTime = EnvironmentEdgeManager.currentTime(); 598 setState(ProcedureState.RUNNABLE); 599 } 600 601 /** 602 * Called by the ProcedureExecutor to assign the parent to the newly created procedure. 603 */ 604 protected void setParentProcId(long parentProcId) { 605 this.parentProcId = parentProcId; 606 } 607 608 protected void setRootProcId(long rootProcId) { 609 this.rootProcId = rootProcId; 610 } 611 612 /** 613 * Called by the ProcedureExecutor to set the value to the newly created procedure. 614 */ 615 @VisibleForTesting 616 protected void setNonceKey(NonceKey nonceKey) { 617 this.nonceKey = nonceKey; 618 } 619 620 @VisibleForTesting 621 public void setOwner(String owner) { 622 this.owner = StringUtils.isEmpty(owner) ? null : owner; 623 } 624 625 public void setOwner(User owner) { 626 assert owner != null : "expected owner to be not null"; 627 setOwner(owner.getShortName()); 628 } 629 630 /** 631 * Called on store load to initialize the Procedure internals after 632 * the creation/deserialization. 633 */ 634 protected void setSubmittedTime(long submittedTime) { 635 this.submittedTime = submittedTime; 636 } 637 638 // ========================================================================== 639 // runtime state - timeout related 640 // ========================================================================== 641 /** 642 * @param timeout timeout interval in msec 643 */ 644 protected void setTimeout(int timeout) { 645 this.timeout = timeout; 646 } 647 648 public boolean hasTimeout() { 649 return timeout != NO_TIMEOUT; 650 } 651 652 /** 653 * @return the timeout in msec 654 */ 655 public int getTimeout() { 656 return timeout; 657 } 658 659 /** 660 * Called on store load to initialize the Procedure internals after 661 * the creation/deserialization. 662 */ 663 protected void setLastUpdate(long lastUpdate) { 664 this.lastUpdate = lastUpdate; 665 } 666 667 /** 668 * Called by ProcedureExecutor after each time a procedure step is executed. 669 */ 670 protected void updateTimestamp() { 671 this.lastUpdate = EnvironmentEdgeManager.currentTime(); 672 } 673 674 public long getLastUpdate() { 675 return lastUpdate; 676 } 677 678 /** 679 * Timeout of the next timeout. 680 * Called by the ProcedureExecutor if the procedure has timeout set and 681 * the procedure is in the waiting queue. 682 * @return the timestamp of the next timeout. 683 */ 684 protected long getTimeoutTimestamp() { 685 return getLastUpdate() + getTimeout(); 686 } 687 688 // ========================================================================== 689 // runtime state 690 // ========================================================================== 691 /** 692 * @return the time elapsed between the last update and the start time of the procedure. 693 */ 694 public long elapsedTime() { 695 return getLastUpdate() - getSubmittedTime(); 696 } 697 698 /** 699 * @return the serialized result if any, otherwise null 700 */ 701 public byte[] getResult() { 702 return result; 703 } 704 705 /** 706 * The procedure may leave a "result" on completion. 707 * @param result the serialized result that will be passed to the client 708 */ 709 protected void setResult(byte[] result) { 710 this.result = result; 711 } 712 713 /** 714 * Will only be called when loading procedures from procedure store, where we need to record 715 * whether the procedure has already held a lock. Later we will call 716 * {@link #restoreLock(Object)} to actually acquire the lock. 717 */ 718 final void lockedWhenLoading() { 719 this.lockedWhenLoading = true; 720 } 721 722 /** 723 * Can only be called when restarting, before the procedure actually being executed, as after we 724 * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset 725 * {@link #lockedWhenLoading} to false. 726 * <p/> 727 * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in 728 * front of a queue. 729 */ 730 public boolean isLockedWhenLoading() { 731 return lockedWhenLoading; 732 } 733 734 // ============================================================================================== 735 // Runtime state, updated every operation by the ProcedureExecutor 736 // 737 // There is always 1 thread at the time operating on the state of the procedure. 738 // The ProcedureExecutor may check and set states, or some Procecedure may 739 // update its own state. but no concurrent updates. we use synchronized here 740 // just because the procedure can get scheduled on different executor threads on each step. 741 // ============================================================================================== 742 743 /** 744 * @return true if the procedure is in a RUNNABLE state. 745 */ 746 public synchronized boolean isRunnable() { 747 return state == ProcedureState.RUNNABLE; 748 } 749 750 public synchronized boolean isInitializing() { 751 return state == ProcedureState.INITIALIZING; 752 } 753 754 /** 755 * @return true if the procedure has failed. It may or may not have rolled back. 756 */ 757 public synchronized boolean isFailed() { 758 return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK; 759 } 760 761 /** 762 * @return true if the procedure is finished successfully. 763 */ 764 public synchronized boolean isSuccess() { 765 return state == ProcedureState.SUCCESS && !hasException(); 766 } 767 768 /** 769 * @return true if the procedure is finished. The Procedure may be completed successfully or 770 * rolledback. 771 */ 772 public synchronized boolean isFinished() { 773 return isSuccess() || state == ProcedureState.ROLLEDBACK; 774 } 775 776 /** 777 * @return true if the procedure is waiting for a child to finish or for an external event. 778 */ 779 public synchronized boolean isWaiting() { 780 switch (state) { 781 case WAITING: 782 case WAITING_TIMEOUT: 783 return true; 784 default: 785 break; 786 } 787 return false; 788 } 789 790 @VisibleForTesting 791 protected synchronized void setState(final ProcedureState state) { 792 this.state = state; 793 updateTimestamp(); 794 } 795 796 public synchronized ProcedureState getState() { 797 return state; 798 } 799 800 protected void setFailure(final String source, final Throwable cause) { 801 setFailure(new RemoteProcedureException(source, cause)); 802 } 803 804 protected synchronized void setFailure(final RemoteProcedureException exception) { 805 this.exception = exception; 806 if (!isFinished()) { 807 setState(ProcedureState.FAILED); 808 } 809 } 810 811 protected void setAbortFailure(final String source, final String msg) { 812 setFailure(source, new ProcedureAbortedException(msg)); 813 } 814 815 /** 816 * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired. 817 * <p/> 818 * Another usage for this method is to implement retrying. A procedure can set the state to 819 * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a 820 * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a 821 * call {@link #setTimeout(int)} method to set the timeout. And you should also override this 822 * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the 823 * timeout event has been handled. 824 * @return true to let the framework handle the timeout as abort, false in case the procedure 825 * handled the timeout itself. 826 */ 827 protected synchronized boolean setTimeoutFailure(TEnvironment env) { 828 if (state == ProcedureState.WAITING_TIMEOUT) { 829 long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate; 830 setFailure("ProcedureExecutor", 831 new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff))); 832 return true; 833 } 834 return false; 835 } 836 837 public synchronized boolean hasException() { 838 return exception != null; 839 } 840 841 public synchronized RemoteProcedureException getException() { 842 return exception; 843 } 844 845 /** 846 * Called by the ProcedureExecutor on procedure-load to restore the latch state 847 */ 848 protected synchronized void setChildrenLatch(int numChildren) { 849 this.childrenLatch = numChildren; 850 if (LOG.isTraceEnabled()) { 851 LOG.trace("CHILD LATCH INCREMENT SET " + 852 this.childrenLatch, new Throwable(this.toString())); 853 } 854 } 855 856 /** 857 * Called by the ProcedureExecutor on procedure-load to restore the latch state 858 */ 859 protected synchronized void incChildrenLatch() { 860 // TODO: can this be inferred from the stack? I think so... 861 this.childrenLatch++; 862 if (LOG.isTraceEnabled()) { 863 LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString())); 864 } 865 } 866 867 /** 868 * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. 869 */ 870 private synchronized boolean childrenCountDown() { 871 assert childrenLatch > 0: this; 872 boolean b = --childrenLatch == 0; 873 if (LOG.isTraceEnabled()) { 874 LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString())); 875 } 876 return b; 877 } 878 879 /** 880 * Try to set this procedure into RUNNABLE state. 881 * Succeeds if all subprocedures/children are done. 882 * @return True if we were able to move procedure to RUNNABLE state. 883 */ 884 synchronized boolean tryRunnable() { 885 // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT 886 if (getState() == ProcedureState.WAITING && childrenCountDown()) { 887 setState(ProcedureState.RUNNABLE); 888 return true; 889 } else { 890 return false; 891 } 892 } 893 894 protected synchronized boolean hasChildren() { 895 return childrenLatch > 0; 896 } 897 898 protected synchronized int getChildrenLatch() { 899 return childrenLatch; 900 } 901 902 /** 903 * Called by the RootProcedureState on procedure execution. 904 * Each procedure store its stack-index positions. 905 */ 906 protected synchronized void addStackIndex(final int index) { 907 if (stackIndexes == null) { 908 stackIndexes = new int[] { index }; 909 } else { 910 int count = stackIndexes.length; 911 stackIndexes = Arrays.copyOf(stackIndexes, count + 1); 912 stackIndexes[count] = index; 913 } 914 } 915 916 protected synchronized boolean removeStackIndex() { 917 if (stackIndexes != null && stackIndexes.length > 1) { 918 stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1); 919 return false; 920 } else { 921 stackIndexes = null; 922 return true; 923 } 924 } 925 926 /** 927 * Called on store load to initialize the Procedure internals after 928 * the creation/deserialization. 929 */ 930 protected synchronized void setStackIndexes(final List<Integer> stackIndexes) { 931 this.stackIndexes = new int[stackIndexes.size()]; 932 for (int i = 0; i < this.stackIndexes.length; ++i) { 933 this.stackIndexes[i] = stackIndexes.get(i); 934 } 935 } 936 937 protected synchronized boolean wasExecuted() { 938 return stackIndexes != null; 939 } 940 941 protected synchronized int[] getStackIndexes() { 942 return stackIndexes; 943 } 944 945 // ========================================================================== 946 // Internal methods - called by the ProcedureExecutor 947 // ========================================================================== 948 949 /** 950 * Internal method called by the ProcedureExecutor that starts the user-level code execute(). 951 * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and 952 * skip out without changing states or releasing any locks held. 953 */ 954 protected Procedure<TEnvironment>[] doExecute(TEnvironment env) 955 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 956 try { 957 updateTimestamp(); 958 if (bypass) { 959 LOG.info("{} bypassed, returning null to finish it", this); 960 return null; 961 } 962 return execute(env); 963 } finally { 964 updateTimestamp(); 965 } 966 } 967 968 /** 969 * Internal method called by the ProcedureExecutor that starts the user-level code rollback(). 970 */ 971 protected void doRollback(TEnvironment env) 972 throws IOException, InterruptedException { 973 try { 974 updateTimestamp(); 975 if (bypass) { 976 LOG.info("{} bypassed, skipping rollback", this); 977 return; 978 } 979 rollback(env); 980 } finally { 981 updateTimestamp(); 982 } 983 } 984 985 final void restoreLock(TEnvironment env) { 986 if (!lockedWhenLoading) { 987 LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this); 988 return; 989 } 990 991 if (isFinished()) { 992 LOG.debug("{} is already finished, skip acquiring lock.", this); 993 return; 994 } 995 996 if (isBypass()) { 997 LOG.debug("{} is already bypassed, skip acquiring lock.", this); 998 return; 999 } 1000 // this can happen if the parent stores the sub procedures but before it can 1001 // release its lock, the master restarts 1002 if (getState() == ProcedureState.WAITING && !holdLock(env)) { 1003 LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this); 1004 lockedWhenLoading = false; 1005 return; 1006 } 1007 LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this); 1008 LockState state = acquireLock(env); 1009 assert state == LockState.LOCK_ACQUIRED; 1010 } 1011 1012 /** 1013 * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock(). 1014 */ 1015 final LockState doAcquireLock(TEnvironment env, ProcedureStore store) { 1016 if (waitInitialized(env)) { 1017 return LockState.LOCK_EVENT_WAIT; 1018 } 1019 if (lockedWhenLoading) { 1020 // reset it so we will not consider it anymore 1021 lockedWhenLoading = false; 1022 locked = true; 1023 // Here we return without persist the locked state, as lockedWhenLoading is true means 1024 // that the locked field of the procedure stored in procedure store is true, so we do not need 1025 // to store it again. 1026 return LockState.LOCK_ACQUIRED; 1027 } 1028 LockState state = acquireLock(env); 1029 if (state == LockState.LOCK_ACQUIRED) { 1030 locked = true; 1031 // persist that we have held the lock. This must be done before we actually execute the 1032 // procedure, otherwise when restarting, we may consider the procedure does not have a lock, 1033 // but it may have already done some changes as we have already executed it, and if another 1034 // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do 1035 // not expect that another procedure can be executed in the middle. 1036 store.update(this); 1037 } 1038 return state; 1039 } 1040 1041 /** 1042 * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock(). 1043 */ 1044 final void doReleaseLock(TEnvironment env, ProcedureStore store) { 1045 locked = false; 1046 // persist that we have released the lock. This must be done before we actually release the 1047 // lock. Another procedure may take this lock immediately after we release the lock, and if we 1048 // crash before persist the information that we have already released the lock, then when 1049 // restarting there will be two procedures which both have the lock and cause problems. 1050 if (getState() != ProcedureState.ROLLEDBACK) { 1051 // If the state is ROLLEDBACK, it means that we have already deleted the procedure from 1052 // procedure store, so do not need to log the release operation any more. 1053 store.update(this); 1054 } 1055 releaseLock(env); 1056 } 1057 1058 @Override 1059 public int compareTo(final Procedure<TEnvironment> other) { 1060 return Long.compare(getProcId(), other.getProcId()); 1061 } 1062 1063 // ========================================================================== 1064 // misc utils 1065 // ========================================================================== 1066 1067 /** 1068 * Get an hashcode for the specified Procedure ID 1069 * @return the hashcode for the specified procId 1070 */ 1071 public static long getProcIdHashCode(long procId) { 1072 long h = procId; 1073 h ^= h >> 16; 1074 h *= 0x85ebca6b; 1075 h ^= h >> 13; 1076 h *= 0xc2b2ae35; 1077 h ^= h >> 16; 1078 return h; 1079 } 1080 1081 /** 1082 * Helper to lookup the root Procedure ID given a specified procedure. 1083 */ 1084 protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures, 1085 Procedure<T> proc) { 1086 while (proc.hasParent()) { 1087 proc = procedures.get(proc.getParentProcId()); 1088 if (proc == null) { 1089 return null; 1090 } 1091 } 1092 return proc.getProcId(); 1093 } 1094 1095 /** 1096 * @param a the first procedure to be compared. 1097 * @param b the second procedure to be compared. 1098 * @return true if the two procedures have the same parent 1099 */ 1100 public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) { 1101 return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId()); 1102 } 1103}