001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.List; 024import java.util.Map; 025import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 026import org.apache.hadoop.hbase.metrics.Counter; 027import org.apache.hadoop.hbase.metrics.Histogram; 028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 029import org.apache.hadoop.hbase.procedure2.util.StringUtils; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.NonceKey; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 040 041/** 042 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate, 043 * stack-indexes, etc. 044 * <p/> 045 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the 046 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may 047 * be called multiple times in the case of failure or a restart, so code must be idempotent. The 048 * return from an execute call is either: null to indicate we are done; ourself if there is more to 049 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes 050 * our execution. 051 * <p/> 052 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps 053 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the 054 * ProcedureState enum from protos: 055 * <ul> 056 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may 057 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to 058 * ROLLEDBACK state.</li> 059 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li> 060 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only 061 * condition when scheduler/ executor will drop procedure from further processing is when procedure 062 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li> 063 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states 064 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li> 065 * </ul> 066 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their 067 * own state. This can lead to confusion. Try to keep the two distinct. 068 * <p/> 069 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback 070 * step is supposed to cleanup the resources created during the execute() step. In case of failure 071 * and restart, rollback() may be called multiple times, so again the code must be idempotent. 072 * <p/> 073 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an 074 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be 075 * locked for the life of a procedure -- not just the calls to execute -- then implementations 076 * should say so with the {@link #holdLock(Object)} method. 077 * <p/> 078 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the 079 * implementation is a bit tricky so we add some comments hrre about it. 080 * <ul> 081 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record 082 * whether we have the lock. We will set it to {@code true} in 083 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in 084 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any 085 * more.</li> 086 * <li>Also added a locked field in the proto message. When storing, the field will be set according 087 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure 088 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto 089 * message is {@code true}.</li> 090 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling 091 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures 092 * need to wait until master is initialized. So the solution here is that, we introduced a new 093 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized 094 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method 095 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the 096 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later 097 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the 098 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is 099 * {@code true}, when we just set the {@link #locked} field to true and return, without actually 100 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li> 101 * </ul> 102 * <p/> 103 * Procedures can be suspended or put in wait state with a callback that gets executed on 104 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and 105 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure 106 * class for an example usage. 107 * </p> 108 * <p/> 109 * There are hooks for collecting metrics on submit of the procedure and on finish. See 110 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}. 111 */ 112@InterfaceAudience.Private 113public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> { 114 private static final Logger LOG = LoggerFactory.getLogger(Procedure.class); 115 public static final long NO_PROC_ID = -1; 116 protected static final int NO_TIMEOUT = -1; 117 118 public enum LockState { 119 LOCK_ACQUIRED, // Lock acquired and ready to execute 120 LOCK_YIELD_WAIT, // Lock not acquired, framework needs to yield 121 LOCK_EVENT_WAIT, // Lock not acquired, an event will yield the procedure 122 } 123 124 // Unchanged after initialization 125 private NonceKey nonceKey = null; 126 private String owner = null; 127 private long parentProcId = NO_PROC_ID; 128 private long rootProcId = NO_PROC_ID; 129 private long procId = NO_PROC_ID; 130 private long submittedTime; 131 132 // Runtime state, updated every operation 133 private ProcedureState state = ProcedureState.INITIALIZING; 134 private RemoteProcedureException exception = null; 135 private int[] stackIndexes = null; 136 private int childrenLatch = 0; 137 138 private volatile int timeout = NO_TIMEOUT; 139 private volatile long lastUpdate; 140 141 private volatile byte[] result = null; 142 143 private volatile boolean locked = false; 144 145 private boolean lockedWhenLoading = false; 146 147 /** 148 * Used for override complete of the procedure without actually doing any logic in the procedure. 149 * If bypass is set to true, when executing it will return null when 150 * {@link #doExecute(Object)} is called to finish the procedure and release any locks 151 * it may currently hold. The bypass does cleanup around the Procedure as far as the 152 * Procedure framework is concerned. It does not clean any internal state that the 153 * Procedure's themselves may have set. That is for the Procedures to do themselves 154 * when bypass is called. They should override bypass and do their cleanup in the 155 * overridden bypass method (be sure to call the parent bypass to ensure proper 156 * processing). 157 * <p></p>Bypassing a procedure is not like aborting. Aborting a procedure will trigger 158 * a rollback. And since the {@link #abort(Object)} method is overrideable 159 * Some procedures may have chosen to ignore the aborting. 160 */ 161 private volatile boolean bypass = false; 162 163 /** 164 * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to 165 * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the 166 * persistence of the procedure. 167 * <p/> 168 * This is useful when the procedure is in error and you want to retry later. The retry interval 169 * and the number of retries are usually not critical so skip the persistence can save some 170 * resources, and also speed up the restart processing. 171 * <p/> 172 * Notice that this value will be reset to true every time before execution. And when rolling back 173 * we do not test this value. 174 */ 175 private boolean persist = true; 176 177 public boolean isBypass() { 178 return bypass; 179 } 180 181 /** 182 * Set the bypass to true. 183 * Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for 184 * now. 185 * DO NOT use this method alone, since we can't just bypass one single procedure. We need to 186 * bypass its ancestor too. If your Procedure has set state, it needs to undo it in here. 187 * @param env Current environment. May be null because of context; e.g. pretty-printing 188 * procedure WALs where there is no 'environment' (and where Procedures that require 189 * an 'environment' won't be run. 190 */ 191 protected void bypass(TEnvironment env) { 192 this.bypass = true; 193 } 194 195 boolean needPersistence() { 196 return persist; 197 } 198 199 void resetPersistence() { 200 persist = true; 201 } 202 203 protected final void skipPersistence() { 204 persist = false; 205 } 206 207 /** 208 * The main code of the procedure. It must be idempotent since execute() 209 * may be called multiple times in case of machine failure in the middle 210 * of the execution. 211 * @param env the environment passed to the ProcedureExecutor 212 * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the 213 * procedure is done. 214 * @throws ProcedureYieldException the procedure will be added back to the queue and retried later. 215 * @throws InterruptedException the procedure will be added back to the queue and retried later. 216 * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself and 217 * has set itself up waiting for an external event to wake it back up again. 218 */ 219 protected abstract Procedure<TEnvironment>[] execute(TEnvironment env) 220 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException; 221 222 /** 223 * The code to undo what was done by the execute() code. 224 * It is called when the procedure or one of the sub-procedures failed or an 225 * abort was requested. It should cleanup all the resources created by 226 * the execute() call. The implementation must be idempotent since rollback() 227 * may be called multiple time in case of machine failure in the middle 228 * of the execution. 229 * @param env the environment passed to the ProcedureExecutor 230 * @throws IOException temporary failure, the rollback will retry later 231 * @throws InterruptedException the procedure will be added back to the queue and retried later 232 */ 233 protected abstract void rollback(TEnvironment env) 234 throws IOException, InterruptedException; 235 236 /** 237 * The abort() call is asynchronous and each procedure must decide how to deal 238 * with it, if they want to be abortable. The simplest implementation 239 * is to have an AtomicBoolean set in the abort() method and then the execute() 240 * will check if the abort flag is set or not. 241 * abort() may be called multiple times from the client, so the implementation 242 * must be idempotent. 243 * 244 * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification 245 * that allows the procedure implementor abort. 246 */ 247 protected abstract boolean abort(TEnvironment env); 248 249 /** 250 * The user-level code of the procedure may have some state to 251 * persist (e.g. input arguments or current position in the processing state) to 252 * be able to resume on failure. 253 * @param serializer stores the serializable state 254 */ 255 protected abstract void serializeStateData(ProcedureStateSerializer serializer) 256 throws IOException; 257 258 /** 259 * Called on store load to allow the user to decode the previously serialized 260 * state. 261 * @param serializer contains the serialized state 262 */ 263 protected abstract void deserializeStateData(ProcedureStateSerializer serializer) 264 throws IOException; 265 266 /** 267 * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will 268 * call us to determine whether we need to wait for initialization, second, it will call 269 * {@link #acquireLock(Object)} to actually handle the lock for this procedure. 270 * <p/> 271 * This is because that when master restarts, we need to restore the lock state for all the 272 * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the 273 * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part 274 * of the initialization!), so we need to split the code into two steps, and when restore, we just 275 * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock. 276 * @return true means we need to wait until the environment has been initialized, otherwise true. 277 */ 278 protected boolean waitInitialized(TEnvironment env) { 279 return false; 280 } 281 282 /** 283 * The user should override this method if they need a lock on an Entity. A lock can be anything, 284 * and it is up to the implementor. The Procedure Framework will call this method just before it 285 * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to 286 * execute. 287 * <p/> 288 * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other 289 * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}. 290 * <p/> 291 * Example: in our Master we can execute request in parallel for different tables. We can create 292 * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is 293 * queued waiting that specific table create to happen. 294 * <p/> 295 * There are 3 LockState: 296 * <ul> 297 * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to 298 * execute.</li> 299 * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should 300 * take care of readding the procedure back to the runnable set for retry</li> 301 * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take 302 * care of readding the procedure back to the runnable set when the lock is available.</li> 303 * </ul> 304 * @return the lock state as described above. 305 */ 306 protected LockState acquireLock(TEnvironment env) { 307 return LockState.LOCK_ACQUIRED; 308 } 309 310 /** 311 * The user should override this method, and release lock if necessary. 312 */ 313 protected void releaseLock(TEnvironment env) { 314 // no-op 315 } 316 317 /** 318 * Used to keep the procedure lock even when the procedure is yielding or suspended. 319 * @return true if the procedure should hold on the lock until completionCleanup() 320 */ 321 protected boolean holdLock(TEnvironment env) { 322 return false; 323 } 324 325 /** 326 * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)} 327 * returns true, the procedure executor will call acquireLock() once and thereafter 328 * not call {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls 329 * release/acquire around each invocation of {@link #execute(Object)}. 330 * @see #holdLock(Object) 331 * @return true if the procedure has the lock, false otherwise. 332 */ 333 public final boolean hasLock() { 334 return locked; 335 } 336 337 /** 338 * Called when the procedure is loaded for replay. 339 * The procedure implementor may use this method to perform some quick 340 * operation before replay. 341 * e.g. failing the procedure if the state on replay may be unknown. 342 */ 343 protected void beforeReplay(TEnvironment env) { 344 // no-op 345 } 346 347 /** 348 * Called when the procedure is ready to be added to the queue after 349 * the loading/replay operation. 350 */ 351 protected void afterReplay(TEnvironment env) { 352 // no-op 353 } 354 355 /** 356 * Called when the procedure is marked as completed (success or rollback). 357 * The procedure implementor may use this method to cleanup in-memory states. 358 * This operation will not be retried on failure. If a procedure took a lock, 359 * it will have been released when this method runs. 360 */ 361 protected void completionCleanup(TEnvironment env) { 362 // no-op 363 } 364 365 /** 366 * By default, the procedure framework/executor will try to run procedures start to finish. 367 * Return true to make the executor yield between each execution step to 368 * give other procedures a chance to run. 369 * @param env the environment passed to the ProcedureExecutor 370 * @return Return true if the executor should yield on completion of an execution step. 371 * Defaults to return false. 372 */ 373 protected boolean isYieldAfterExecutionStep(TEnvironment env) { 374 return false; 375 } 376 377 /** 378 * By default, the executor will keep the procedure result around util 379 * the eviction TTL is expired. The client can cut down the waiting time 380 * by requesting that the result is removed from the executor. 381 * In case of system started procedure, we can force the executor to auto-ack. 382 * @param env the environment passed to the ProcedureExecutor 383 * @return true if the executor should wait the client ack for the result. 384 * Defaults to return true. 385 */ 386 protected boolean shouldWaitClientAck(TEnvironment env) { 387 return true; 388 } 389 390 /** 391 * Override this method to provide procedure specific counters for submitted count, failed 392 * count and time histogram. 393 * @param env The environment passed to the procedure executor 394 * @return Container object for procedure related metric 395 */ 396 protected ProcedureMetrics getProcedureMetrics(TEnvironment env) { 397 return null; 398 } 399 400 /** 401 * This function will be called just when procedure is submitted for execution. Override this 402 * method to update the metrics at the beginning of the procedure. The default implementation 403 * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null 404 * {@link ProcedureMetrics}. 405 */ 406 protected void updateMetricsOnSubmit(TEnvironment env) { 407 ProcedureMetrics metrics = getProcedureMetrics(env); 408 if (metrics == null) { 409 return; 410 } 411 412 Counter submittedCounter = metrics.getSubmittedCounter(); 413 if (submittedCounter != null) { 414 submittedCounter.increment(); 415 } 416 } 417 418 /** 419 * This function will be called just after procedure execution is finished. Override this method 420 * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns 421 * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a 422 * time histogram for successfully completed procedures. Increments failed counter for failed 423 * procedures. 424 * <p/> 425 * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including 426 * successfully finished siblings, this function may get called twice in certain cases for certain 427 * procedures. Explore further if this can be called once. 428 * @param env The environment passed to the procedure executor 429 * @param runtime Runtime of the procedure in milliseconds 430 * @param success true if procedure is completed successfully 431 */ 432 protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) { 433 ProcedureMetrics metrics = getProcedureMetrics(env); 434 if (metrics == null) { 435 return; 436 } 437 438 if (success) { 439 Histogram timeHisto = metrics.getTimeHisto(); 440 if (timeHisto != null) { 441 timeHisto.update(runtime); 442 } 443 } else { 444 Counter failedCounter = metrics.getFailedCounter(); 445 if (failedCounter != null) { 446 failedCounter.increment(); 447 } 448 } 449 } 450 451 @Override 452 public String toString() { 453 // Return the simple String presentation of the procedure. 454 return toStringSimpleSB().toString(); 455 } 456 457 /** 458 * Build the StringBuilder for the simple form of procedure string. 459 * @return the StringBuilder 460 */ 461 protected StringBuilder toStringSimpleSB() { 462 final StringBuilder sb = new StringBuilder(); 463 464 sb.append("pid="); 465 sb.append(getProcId()); 466 467 if (hasParent()) { 468 sb.append(", ppid="); 469 sb.append(getParentProcId()); 470 } 471 472 /** 473 * TODO 474 * Enable later when this is being used. 475 * Currently owner not used. 476 if (hasOwner()) { 477 sb.append(", owner="); 478 sb.append(getOwner()); 479 }*/ 480 481 sb.append(", state="); // pState for Procedure State as opposed to any other kind. 482 toStringState(sb); 483 484 // Only print out locked if actually locked. Most of the time it is not. 485 if (this.locked) { 486 sb.append(", locked=").append(locked); 487 } 488 489 if (bypass) { 490 sb.append(", bypass=").append(bypass); 491 } 492 493 if (hasException()) { 494 sb.append(", exception=" + getException()); 495 } 496 497 sb.append("; "); 498 toStringClassDetails(sb); 499 500 return sb; 501 } 502 503 /** 504 * Extend the toString() information with more procedure details 505 */ 506 public String toStringDetails() { 507 final StringBuilder sb = toStringSimpleSB(); 508 509 sb.append(" submittedTime="); 510 sb.append(getSubmittedTime()); 511 512 sb.append(", lastUpdate="); 513 sb.append(getLastUpdate()); 514 515 final int[] stackIndices = getStackIndexes(); 516 if (stackIndices != null) { 517 sb.append("\n"); 518 sb.append("stackIndexes="); 519 sb.append(Arrays.toString(stackIndices)); 520 } 521 522 return sb.toString(); 523 } 524 525 protected String toStringClass() { 526 StringBuilder sb = new StringBuilder(); 527 toStringClassDetails(sb); 528 return sb.toString(); 529 } 530 531 /** 532 * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating 533 * generic Procedure State with Procedure particulars. 534 * @param builder Append current {@link ProcedureState} 535 */ 536 protected void toStringState(StringBuilder builder) { 537 builder.append(getState()); 538 } 539 540 /** 541 * Extend the toString() information with the procedure details 542 * e.g. className and parameters 543 * @param builder the string builder to use to append the proc specific information 544 */ 545 protected void toStringClassDetails(StringBuilder builder) { 546 builder.append(getClass().getName()); 547 } 548 549 // ========================================================================== 550 // Those fields are unchanged after initialization. 551 // 552 // Each procedure will get created from the user or during 553 // ProcedureExecutor.start() during the load() phase and then submitted 554 // to the executor. these fields will never be changed after initialization 555 // ========================================================================== 556 public long getProcId() { 557 return procId; 558 } 559 560 public boolean hasParent() { 561 return parentProcId != NO_PROC_ID; 562 } 563 564 public long getParentProcId() { 565 return parentProcId; 566 } 567 568 public long getRootProcId() { 569 return rootProcId; 570 } 571 572 public String getProcName() { 573 return toStringClass(); 574 } 575 576 public NonceKey getNonceKey() { 577 return nonceKey; 578 } 579 580 public long getSubmittedTime() { 581 return submittedTime; 582 } 583 584 public String getOwner() { 585 return owner; 586 } 587 588 public boolean hasOwner() { 589 return owner != null; 590 } 591 592 /** 593 * Called by the ProcedureExecutor to assign the ID to the newly created procedure. 594 */ 595 @VisibleForTesting 596 protected void setProcId(long procId) { 597 this.procId = procId; 598 this.submittedTime = EnvironmentEdgeManager.currentTime(); 599 setState(ProcedureState.RUNNABLE); 600 } 601 602 /** 603 * Called by the ProcedureExecutor to assign the parent to the newly created procedure. 604 */ 605 protected void setParentProcId(long parentProcId) { 606 this.parentProcId = parentProcId; 607 } 608 609 protected void setRootProcId(long rootProcId) { 610 this.rootProcId = rootProcId; 611 } 612 613 /** 614 * Called by the ProcedureExecutor to set the value to the newly created procedure. 615 */ 616 @VisibleForTesting 617 protected void setNonceKey(NonceKey nonceKey) { 618 this.nonceKey = nonceKey; 619 } 620 621 @VisibleForTesting 622 public void setOwner(String owner) { 623 this.owner = StringUtils.isEmpty(owner) ? null : owner; 624 } 625 626 public void setOwner(User owner) { 627 assert owner != null : "expected owner to be not null"; 628 setOwner(owner.getShortName()); 629 } 630 631 /** 632 * Called on store load to initialize the Procedure internals after 633 * the creation/deserialization. 634 */ 635 protected void setSubmittedTime(long submittedTime) { 636 this.submittedTime = submittedTime; 637 } 638 639 // ========================================================================== 640 // runtime state - timeout related 641 // ========================================================================== 642 /** 643 * @param timeout timeout interval in msec 644 */ 645 protected void setTimeout(int timeout) { 646 this.timeout = timeout; 647 } 648 649 public boolean hasTimeout() { 650 return timeout != NO_TIMEOUT; 651 } 652 653 /** 654 * @return the timeout in msec 655 */ 656 public int getTimeout() { 657 return timeout; 658 } 659 660 /** 661 * Called on store load to initialize the Procedure internals after 662 * the creation/deserialization. 663 */ 664 protected void setLastUpdate(long lastUpdate) { 665 this.lastUpdate = lastUpdate; 666 } 667 668 /** 669 * Called by ProcedureExecutor after each time a procedure step is executed. 670 */ 671 protected void updateTimestamp() { 672 this.lastUpdate = EnvironmentEdgeManager.currentTime(); 673 } 674 675 public long getLastUpdate() { 676 return lastUpdate; 677 } 678 679 /** 680 * Timeout of the next timeout. 681 * Called by the ProcedureExecutor if the procedure has timeout set and 682 * the procedure is in the waiting queue. 683 * @return the timestamp of the next timeout. 684 */ 685 protected long getTimeoutTimestamp() { 686 return getLastUpdate() + getTimeout(); 687 } 688 689 // ========================================================================== 690 // runtime state 691 // ========================================================================== 692 /** 693 * @return the time elapsed between the last update and the start time of the procedure. 694 */ 695 public long elapsedTime() { 696 return getLastUpdate() - getSubmittedTime(); 697 } 698 699 /** 700 * @return the serialized result if any, otherwise null 701 */ 702 public byte[] getResult() { 703 return result; 704 } 705 706 /** 707 * The procedure may leave a "result" on completion. 708 * @param result the serialized result that will be passed to the client 709 */ 710 protected void setResult(byte[] result) { 711 this.result = result; 712 } 713 714 /** 715 * Will only be called when loading procedures from procedure store, where we need to record 716 * whether the procedure has already held a lock. Later we will call 717 * {@link #restoreLock(Object, ProcedureStore)} to actually acquire the lock. 718 */ 719 final void lockedWhenLoading() { 720 this.lockedWhenLoading = true; 721 } 722 723 /** 724 * Can only be called when restarting, before the procedure actually being executed, as after we 725 * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset 726 * {@link #lockedWhenLoading} to false. 727 * <p/> 728 * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in 729 * front of a queue. 730 */ 731 public boolean isLockedWhenLoading() { 732 return lockedWhenLoading; 733 } 734 735 // ============================================================================================== 736 // Runtime state, updated every operation by the ProcedureExecutor 737 // 738 // There is always 1 thread at the time operating on the state of the procedure. 739 // The ProcedureExecutor may check and set states, or some Procecedure may 740 // update its own state. but no concurrent updates. we use synchronized here 741 // just because the procedure can get scheduled on different executor threads on each step. 742 // ============================================================================================== 743 744 /** 745 * @return true if the procedure is in a RUNNABLE state. 746 */ 747 public synchronized boolean isRunnable() { 748 return state == ProcedureState.RUNNABLE; 749 } 750 751 public synchronized boolean isInitializing() { 752 return state == ProcedureState.INITIALIZING; 753 } 754 755 /** 756 * @return true if the procedure has failed. It may or may not have rolled back. 757 */ 758 public synchronized boolean isFailed() { 759 return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK; 760 } 761 762 /** 763 * @return true if the procedure is finished successfully. 764 */ 765 public synchronized boolean isSuccess() { 766 return state == ProcedureState.SUCCESS && !hasException(); 767 } 768 769 /** 770 * @return true if the procedure is finished. The Procedure may be completed successfully or 771 * rolledback. 772 */ 773 public synchronized boolean isFinished() { 774 return isSuccess() || state == ProcedureState.ROLLEDBACK; 775 } 776 777 /** 778 * @return true if the procedure is waiting for a child to finish or for an external event. 779 */ 780 public synchronized boolean isWaiting() { 781 switch (state) { 782 case WAITING: 783 case WAITING_TIMEOUT: 784 return true; 785 default: 786 break; 787 } 788 return false; 789 } 790 791 @VisibleForTesting 792 protected synchronized void setState(final ProcedureState state) { 793 this.state = state; 794 updateTimestamp(); 795 } 796 797 public synchronized ProcedureState getState() { 798 return state; 799 } 800 801 protected void setFailure(final String source, final Throwable cause) { 802 setFailure(new RemoteProcedureException(source, cause)); 803 } 804 805 protected synchronized void setFailure(final RemoteProcedureException exception) { 806 this.exception = exception; 807 if (!isFinished()) { 808 setState(ProcedureState.FAILED); 809 } 810 } 811 812 protected void setAbortFailure(final String source, final String msg) { 813 setFailure(source, new ProcedureAbortedException(msg)); 814 } 815 816 /** 817 * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired. 818 * <p/> 819 * Another usage for this method is to implement retrying. A procedure can set the state to 820 * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a 821 * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a 822 * call {@link #setTimeout(int)} method to set the timeout. And you should also override this 823 * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the 824 * timeout event has been handled. 825 * @return true to let the framework handle the timeout as abort, false in case the procedure 826 * handled the timeout itself. 827 */ 828 protected synchronized boolean setTimeoutFailure(TEnvironment env) { 829 if (state == ProcedureState.WAITING_TIMEOUT) { 830 long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate; 831 setFailure("ProcedureExecutor", 832 new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff))); 833 return true; 834 } 835 return false; 836 } 837 838 public synchronized boolean hasException() { 839 return exception != null; 840 } 841 842 public synchronized RemoteProcedureException getException() { 843 return exception; 844 } 845 846 /** 847 * Called by the ProcedureExecutor on procedure-load to restore the latch state 848 */ 849 protected synchronized void setChildrenLatch(int numChildren) { 850 this.childrenLatch = numChildren; 851 if (LOG.isTraceEnabled()) { 852 LOG.trace("CHILD LATCH INCREMENT SET " + 853 this.childrenLatch, new Throwable(this.toString())); 854 } 855 } 856 857 /** 858 * Called by the ProcedureExecutor on procedure-load to restore the latch state 859 */ 860 protected synchronized void incChildrenLatch() { 861 // TODO: can this be inferred from the stack? I think so... 862 this.childrenLatch++; 863 if (LOG.isTraceEnabled()) { 864 LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString())); 865 } 866 } 867 868 /** 869 * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. 870 */ 871 private synchronized boolean childrenCountDown() { 872 assert childrenLatch > 0: this; 873 boolean b = --childrenLatch == 0; 874 if (LOG.isTraceEnabled()) { 875 LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString())); 876 } 877 return b; 878 } 879 880 /** 881 * Try to set this procedure into RUNNABLE state. 882 * Succeeds if all subprocedures/children are done. 883 * @return True if we were able to move procedure to RUNNABLE state. 884 */ 885 synchronized boolean tryRunnable() { 886 // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT 887 if (getState() == ProcedureState.WAITING && childrenCountDown()) { 888 setState(ProcedureState.RUNNABLE); 889 return true; 890 } else { 891 return false; 892 } 893 } 894 895 protected synchronized boolean hasChildren() { 896 return childrenLatch > 0; 897 } 898 899 /** 900 * @return Count of children outstanding (Badly named). 901 */ 902 protected synchronized int getChildrenLatch() { 903 return childrenLatch; 904 } 905 906 /** 907 * Called by the RootProcedureState on procedure execution. 908 * Each procedure store its stack-index positions. 909 */ 910 protected synchronized void addStackIndex(final int index) { 911 if (stackIndexes == null) { 912 stackIndexes = new int[] { index }; 913 } else { 914 int count = stackIndexes.length; 915 stackIndexes = Arrays.copyOf(stackIndexes, count + 1); 916 stackIndexes[count] = index; 917 } 918 } 919 920 protected synchronized boolean removeStackIndex() { 921 if (stackIndexes != null && stackIndexes.length > 1) { 922 stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1); 923 return false; 924 } else { 925 stackIndexes = null; 926 return true; 927 } 928 } 929 930 /** 931 * Called on store load to initialize the Procedure internals after 932 * the creation/deserialization. 933 */ 934 protected synchronized void setStackIndexes(final List<Integer> stackIndexes) { 935 this.stackIndexes = new int[stackIndexes.size()]; 936 for (int i = 0; i < this.stackIndexes.length; ++i) { 937 this.stackIndexes[i] = stackIndexes.get(i); 938 } 939 } 940 941 protected synchronized boolean wasExecuted() { 942 return stackIndexes != null; 943 } 944 945 protected synchronized int[] getStackIndexes() { 946 return stackIndexes; 947 } 948 949 // ========================================================================== 950 // Internal methods - called by the ProcedureExecutor 951 // ========================================================================== 952 953 /** 954 * Internal method called by the ProcedureExecutor that starts the user-level code execute(). 955 * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and 956 * skip out without changing states or releasing any locks held. 957 */ 958 protected Procedure<TEnvironment>[] doExecute(TEnvironment env) 959 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 960 try { 961 updateTimestamp(); 962 if (bypass) { 963 LOG.info("{} bypassed, returning null to finish it", this); 964 return null; 965 } 966 return execute(env); 967 } finally { 968 updateTimestamp(); 969 } 970 } 971 972 /** 973 * Internal method called by the ProcedureExecutor that starts the user-level code rollback(). 974 */ 975 protected void doRollback(TEnvironment env) 976 throws IOException, InterruptedException { 977 try { 978 updateTimestamp(); 979 if (bypass) { 980 LOG.info("{} bypassed, skipping rollback", this); 981 return; 982 } 983 rollback(env); 984 } finally { 985 updateTimestamp(); 986 } 987 } 988 989 final void restoreLock(TEnvironment env) { 990 if (!lockedWhenLoading) { 991 LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this); 992 return; 993 } 994 995 if (isFinished()) { 996 LOG.debug("{} is already finished, skip acquiring lock.", this); 997 return; 998 } 999 1000 if (isBypass()) { 1001 LOG.debug("{} is already bypassed, skip acquiring lock.", this); 1002 return; 1003 } 1004 // this can happen if the parent stores the sub procedures but before it can 1005 // release its lock, the master restarts 1006 if (getState() == ProcedureState.WAITING && !holdLock(env)) { 1007 LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this); 1008 lockedWhenLoading = false; 1009 return; 1010 } 1011 LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this); 1012 LockState state = acquireLock(env); 1013 assert state == LockState.LOCK_ACQUIRED; 1014 } 1015 1016 /** 1017 * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock(). 1018 */ 1019 final LockState doAcquireLock(TEnvironment env, ProcedureStore store) { 1020 if (waitInitialized(env)) { 1021 return LockState.LOCK_EVENT_WAIT; 1022 } 1023 if (lockedWhenLoading) { 1024 // reset it so we will not consider it anymore 1025 lockedWhenLoading = false; 1026 locked = true; 1027 // Here we return without persist the locked state, as lockedWhenLoading is true means 1028 // that the locked field of the procedure stored in procedure store is true, so we do not need 1029 // to store it again. 1030 return LockState.LOCK_ACQUIRED; 1031 } 1032 LockState state = acquireLock(env); 1033 if (state == LockState.LOCK_ACQUIRED) { 1034 locked = true; 1035 // persist that we have held the lock. This must be done before we actually execute the 1036 // procedure, otherwise when restarting, we may consider the procedure does not have a lock, 1037 // but it may have already done some changes as we have already executed it, and if another 1038 // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do 1039 // not expect that another procedure can be executed in the middle. 1040 store.update(this); 1041 } 1042 return state; 1043 } 1044 1045 /** 1046 * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock(). 1047 */ 1048 final void doReleaseLock(TEnvironment env, ProcedureStore store) { 1049 locked = false; 1050 // persist that we have released the lock. This must be done before we actually release the 1051 // lock. Another procedure may take this lock immediately after we release the lock, and if we 1052 // crash before persist the information that we have already released the lock, then when 1053 // restarting there will be two procedures which both have the lock and cause problems. 1054 if (getState() != ProcedureState.ROLLEDBACK) { 1055 // If the state is ROLLEDBACK, it means that we have already deleted the procedure from 1056 // procedure store, so do not need to log the release operation any more. 1057 store.update(this); 1058 } 1059 releaseLock(env); 1060 } 1061 1062 @Override 1063 public int compareTo(final Procedure<TEnvironment> other) { 1064 return Long.compare(getProcId(), other.getProcId()); 1065 } 1066 1067 // ========================================================================== 1068 // misc utils 1069 // ========================================================================== 1070 1071 /** 1072 * Get an hashcode for the specified Procedure ID 1073 * @return the hashcode for the specified procId 1074 */ 1075 public static long getProcIdHashCode(long procId) { 1076 long h = procId; 1077 h ^= h >> 16; 1078 h *= 0x85ebca6b; 1079 h ^= h >> 13; 1080 h *= 0xc2b2ae35; 1081 h ^= h >> 16; 1082 return h; 1083 } 1084 1085 /** 1086 * Helper to lookup the root Procedure ID given a specified procedure. 1087 */ 1088 protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures, 1089 Procedure<T> proc) { 1090 while (proc.hasParent()) { 1091 proc = procedures.get(proc.getParentProcId()); 1092 if (proc == null) { 1093 return null; 1094 } 1095 } 1096 return proc.getProcId(); 1097 } 1098 1099 /** 1100 * @param a the first procedure to be compared. 1101 * @param b the second procedure to be compared. 1102 * @return true if the two procedures have the same parent 1103 */ 1104 public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) { 1105 return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId()); 1106 } 1107}