001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.Map; 024import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 025import org.apache.hadoop.hbase.metrics.Counter; 026import org.apache.hadoop.hbase.metrics.Histogram; 027import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 028import org.apache.hadoop.hbase.procedure2.util.StringUtils; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.NonceKey; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 037 038/** 039 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate, 040 * stack-indexes, etc. 041 * <p/> 042 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the 043 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may 044 * be called multiple times in the case of failure or a restart, so code must be idempotent. The 045 * return from an execute call is either: null to indicate we are done; ourself if there is more to 046 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes 047 * our execution. 048 * <p/> 049 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps 050 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the 051 * ProcedureState enum from protos: 052 * <ul> 053 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may 054 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to 055 * ROLLEDBACK state.</li> 056 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li> 057 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only 058 * condition when scheduler/ executor will drop procedure from further processing is when procedure 059 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li> 060 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states 061 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li> 062 * </ul> 063 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their 064 * own state. This can lead to confusion. Try to keep the two distinct. 065 * <p/> 066 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback 067 * step is supposed to cleanup the resources created during the execute() step. In case of failure 068 * and restart, rollback() may be called multiple times, so again the code must be idempotent. 069 * <p/> 070 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an 071 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be 072 * locked for the life of a procedure -- not just the calls to execute -- then implementations 073 * should say so with the {@link #holdLock(Object)} method. 074 * <p/> 075 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the 076 * implementation is a bit tricky so we add some comments hrre about it. 077 * <ul> 078 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record 079 * whether we have the lock. We will set it to {@code true} in 080 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in 081 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any 082 * more.</li> 083 * <li>Also added a locked field in the proto message. When storing, the field will be set according 084 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure 085 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto 086 * message is {@code true}.</li> 087 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling 088 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures 089 * need to wait until master is initialized. So the solution here is that, we introduced a new 090 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized 091 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method 092 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the 093 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later 094 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the 095 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is 096 * {@code true}, when we just set the {@link #locked} field to true and return, without actually 097 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li> 098 * </ul> 099 * <p/> 100 * Procedures can be suspended or put in wait state with a callback that gets executed on 101 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and 102 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure 103 * class for an example usage. 104 * </p> 105 * <p/> 106 * There are hooks for collecting metrics on submit of the procedure and on finish. See 107 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}. 108 */ 109@InterfaceAudience.Private 110public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> { 111 private static final Logger LOG = LoggerFactory.getLogger(Procedure.class); 112 public static final long NO_PROC_ID = -1; 113 protected static final int NO_TIMEOUT = -1; 114 115 public enum LockState { 116 LOCK_ACQUIRED, // Lock acquired and ready to execute 117 LOCK_YIELD_WAIT, // Lock not acquired, framework needs to yield 118 LOCK_EVENT_WAIT, // Lock not acquired, an event will yield the procedure 119 } 120 121 // Unchanged after initialization 122 private NonceKey nonceKey = null; 123 private String owner = null; 124 private long parentProcId = NO_PROC_ID; 125 private long rootProcId = NO_PROC_ID; 126 private long procId = NO_PROC_ID; 127 private long submittedTime; 128 129 // Runtime state, updated every operation 130 private ProcedureState state = ProcedureState.INITIALIZING; 131 private RemoteProcedureException exception = null; 132 private int[] stackIndexes = null; 133 private int childrenLatch = 0; 134 135 private volatile int timeout = NO_TIMEOUT; 136 private volatile long lastUpdate; 137 138 private volatile byte[] result = null; 139 140 private volatile boolean locked = false; 141 142 private boolean lockedWhenLoading = false; 143 144 /** 145 * Used for override complete of the procedure without actually doing any logic in the procedure. 146 * If bypass is set to true, when executing it will return null when 147 * {@link #doExecute(Object)} is called to finish the procedure and release any locks 148 * it may currently hold. The bypass does cleanup around the Procedure as far as the 149 * Procedure framework is concerned. It does not clean any internal state that the 150 * Procedure's themselves may have set. That is for the Procedures to do themselves 151 * when bypass is called. They should override bypass and do their cleanup in the 152 * overridden bypass method (be sure to call the parent bypass to ensure proper 153 * processing). 154 * <p></p>Bypassing a procedure is not like aborting. Aborting a procedure will trigger 155 * a rollback. And since the {@link #abort(Object)} method is overrideable 156 * Some procedures may have chosen to ignore the aborting. 157 */ 158 private volatile boolean bypass = false; 159 160 /** 161 * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to 162 * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the 163 * persistence of the procedure. 164 * <p/> 165 * This is useful when the procedure is in error and you want to retry later. The retry interval 166 * and the number of retries are usually not critical so skip the persistence can save some 167 * resources, and also speed up the restart processing. 168 * <p/> 169 * Notice that this value will be reset to true every time before execution. And when rolling back 170 * we do not test this value. 171 */ 172 private boolean persist = true; 173 174 public boolean isBypass() { 175 return bypass; 176 } 177 178 /** 179 * Set the bypass to true. 180 * Only called in {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now. 181 * DO NOT use this method alone, since we can't just bypass one single procedure. We need to 182 * bypass its ancestor too. If your Procedure has set state, it needs to undo it in here. 183 * @param env Current environment. May be null because of context; e.g. pretty-printing 184 * procedure WALs where there is no 'environment' (and where Procedures that require 185 * an 'environment' won't be run. 186 */ 187 protected void bypass(TEnvironment env) { 188 this.bypass = true; 189 } 190 191 boolean needPersistence() { 192 return persist; 193 } 194 195 void resetPersistence() { 196 persist = true; 197 } 198 199 protected final void skipPersistence() { 200 persist = false; 201 } 202 203 /** 204 * The main code of the procedure. It must be idempotent since execute() 205 * may be called multiple times in case of machine failure in the middle 206 * of the execution. 207 * @param env the environment passed to the ProcedureExecutor 208 * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the 209 * procedure is done. 210 * @throws ProcedureYieldException the procedure will be added back to the queue and retried 211 * later. 212 * @throws InterruptedException the procedure will be added back to the queue and retried later. 213 * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself 214 * and has set itself up waiting for an external event to wake it back up again. 215 */ 216 protected abstract Procedure<TEnvironment>[] execute(TEnvironment env) 217 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException; 218 219 /** 220 * The code to undo what was done by the execute() code. 221 * It is called when the procedure or one of the sub-procedures failed or an 222 * abort was requested. It should cleanup all the resources created by 223 * the execute() call. The implementation must be idempotent since rollback() 224 * may be called multiple time in case of machine failure in the middle 225 * of the execution. 226 * @param env the environment passed to the ProcedureExecutor 227 * @throws IOException temporary failure, the rollback will retry later 228 * @throws InterruptedException the procedure will be added back to the queue and retried later 229 */ 230 protected abstract void rollback(TEnvironment env) 231 throws IOException, InterruptedException; 232 233 /** 234 * The abort() call is asynchronous and each procedure must decide how to deal 235 * with it, if they want to be abortable. The simplest implementation 236 * is to have an AtomicBoolean set in the abort() method and then the execute() 237 * will check if the abort flag is set or not. 238 * abort() may be called multiple times from the client, so the implementation 239 * must be idempotent. 240 * 241 * <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification 242 * that allows the procedure implementor abort. 243 */ 244 protected abstract boolean abort(TEnvironment env); 245 246 /** 247 * The user-level code of the procedure may have some state to 248 * persist (e.g. input arguments or current position in the processing state) to 249 * be able to resume on failure. 250 * @param serializer stores the serializable state 251 */ 252 protected abstract void serializeStateData(ProcedureStateSerializer serializer) 253 throws IOException; 254 255 /** 256 * Called on store load to allow the user to decode the previously serialized 257 * state. 258 * @param serializer contains the serialized state 259 */ 260 protected abstract void deserializeStateData(ProcedureStateSerializer serializer) 261 throws IOException; 262 263 /** 264 * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will 265 * call us to determine whether we need to wait for initialization, second, it will call 266 * {@link #acquireLock(Object)} to actually handle the lock for this procedure. 267 * <p/> 268 * This is because that when master restarts, we need to restore the lock state for all the 269 * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the 270 * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part 271 * of the initialization!), so we need to split the code into two steps, and when restore, we just 272 * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock. 273 * @return true means we need to wait until the environment has been initialized, otherwise true. 274 */ 275 protected boolean waitInitialized(TEnvironment env) { 276 return false; 277 } 278 279 /** 280 * The user should override this method if they need a lock on an Entity. A lock can be anything, 281 * and it is up to the implementor. The Procedure Framework will call this method just before it 282 * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to 283 * execute. 284 * <p/> 285 * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other 286 * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}. 287 * <p/> 288 * Example: in our Master we can execute request in parallel for different tables. We can create 289 * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is 290 * queued waiting that specific table create to happen. 291 * <p/> 292 * There are 3 LockState: 293 * <ul> 294 * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to 295 * execute.</li> 296 * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should 297 * take care of readding the procedure back to the runnable set for retry</li> 298 * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take 299 * care of readding the procedure back to the runnable set when the lock is available.</li> 300 * </ul> 301 * @return the lock state as described above. 302 */ 303 protected LockState acquireLock(TEnvironment env) { 304 return LockState.LOCK_ACQUIRED; 305 } 306 307 /** 308 * The user should override this method, and release lock if necessary. 309 */ 310 protected void releaseLock(TEnvironment env) { 311 // no-op 312 } 313 314 /** 315 * Used to keep the procedure lock even when the procedure is yielding or suspended. 316 * @return true if the procedure should hold on the lock until completionCleanup() 317 */ 318 protected boolean holdLock(TEnvironment env) { 319 return false; 320 } 321 322 /** 323 * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)} 324 * returns true, the procedure executor will call acquireLock() once and thereafter 325 * not call {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls 326 * release/acquire around each invocation of {@link #execute(Object)}. 327 * @see #holdLock(Object) 328 * @return true if the procedure has the lock, false otherwise. 329 */ 330 public final boolean hasLock() { 331 return locked; 332 } 333 334 /** 335 * Called when the procedure is loaded for replay. 336 * The procedure implementor may use this method to perform some quick 337 * operation before replay. 338 * e.g. failing the procedure if the state on replay may be unknown. 339 */ 340 protected void beforeReplay(TEnvironment env) { 341 // no-op 342 } 343 344 /** 345 * Called when the procedure is ready to be added to the queue after 346 * the loading/replay operation. 347 */ 348 protected void afterReplay(TEnvironment env) { 349 // no-op 350 } 351 352 /** 353 * Called when the procedure is marked as completed (success or rollback). 354 * The procedure implementor may use this method to cleanup in-memory states. 355 * This operation will not be retried on failure. If a procedure took a lock, 356 * it will have been released when this method runs. 357 */ 358 protected void completionCleanup(TEnvironment env) { 359 // no-op 360 } 361 362 /** 363 * By default, the procedure framework/executor will try to run procedures start to finish. 364 * Return true to make the executor yield between each execution step to 365 * give other procedures a chance to run. 366 * @param env the environment passed to the ProcedureExecutor 367 * @return Return true if the executor should yield on completion of an execution step. 368 * Defaults to return false. 369 */ 370 protected boolean isYieldAfterExecutionStep(TEnvironment env) { 371 return false; 372 } 373 374 /** 375 * By default, the executor will keep the procedure result around util 376 * the eviction TTL is expired. The client can cut down the waiting time 377 * by requesting that the result is removed from the executor. 378 * In case of system started procedure, we can force the executor to auto-ack. 379 * @param env the environment passed to the ProcedureExecutor 380 * @return true if the executor should wait the client ack for the result. 381 * Defaults to return true. 382 */ 383 protected boolean shouldWaitClientAck(TEnvironment env) { 384 return true; 385 } 386 387 /** 388 * Override this method to provide procedure specific counters for submitted count, failed 389 * count and time histogram. 390 * @param env The environment passed to the procedure executor 391 * @return Container object for procedure related metric 392 */ 393 protected ProcedureMetrics getProcedureMetrics(TEnvironment env) { 394 return null; 395 } 396 397 /** 398 * This function will be called just when procedure is submitted for execution. Override this 399 * method to update the metrics at the beginning of the procedure. The default implementation 400 * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null 401 * {@link ProcedureMetrics}. 402 */ 403 protected void updateMetricsOnSubmit(TEnvironment env) { 404 ProcedureMetrics metrics = getProcedureMetrics(env); 405 if (metrics == null) { 406 return; 407 } 408 409 Counter submittedCounter = metrics.getSubmittedCounter(); 410 if (submittedCounter != null) { 411 submittedCounter.increment(); 412 } 413 } 414 415 /** 416 * This function will be called just after procedure execution is finished. Override this method 417 * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns 418 * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a 419 * time histogram for successfully completed procedures. Increments failed counter for failed 420 * procedures. 421 * <p/> 422 * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including 423 * successfully finished siblings, this function may get called twice in certain cases for certain 424 * procedures. Explore further if this can be called once. 425 * @param env The environment passed to the procedure executor 426 * @param runtime Runtime of the procedure in milliseconds 427 * @param success true if procedure is completed successfully 428 */ 429 protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) { 430 ProcedureMetrics metrics = getProcedureMetrics(env); 431 if (metrics == null) { 432 return; 433 } 434 435 if (success) { 436 Histogram timeHisto = metrics.getTimeHisto(); 437 if (timeHisto != null) { 438 timeHisto.update(runtime); 439 } 440 } else { 441 Counter failedCounter = metrics.getFailedCounter(); 442 if (failedCounter != null) { 443 failedCounter.increment(); 444 } 445 } 446 } 447 448 @Override 449 public String toString() { 450 // Return the simple String presentation of the procedure. 451 return toStringSimpleSB().toString(); 452 } 453 454 /** 455 * Build the StringBuilder for the simple form of procedure string. 456 * @return the StringBuilder 457 */ 458 protected StringBuilder toStringSimpleSB() { 459 final StringBuilder sb = new StringBuilder(); 460 461 sb.append("pid="); 462 sb.append(getProcId()); 463 464 if (hasParent()) { 465 sb.append(", ppid="); 466 sb.append(getParentProcId()); 467 } 468 469 /* 470 * TODO 471 * Enable later when this is being used. 472 * Currently owner not used. 473 if (hasOwner()) { 474 sb.append(", owner="); 475 sb.append(getOwner()); 476 }*/ 477 478 sb.append(", state="); // pState for Procedure State as opposed to any other kind. 479 toStringState(sb); 480 481 // Only print out locked if actually locked. Most of the time it is not. 482 if (this.locked) { 483 sb.append(", locked=").append(locked); 484 } 485 486 if (bypass) { 487 sb.append(", bypass=").append(bypass); 488 } 489 490 if (hasException()) { 491 sb.append(", exception=" + getException()); 492 } 493 494 sb.append("; "); 495 toStringClassDetails(sb); 496 497 return sb; 498 } 499 500 /** 501 * Extend the toString() information with more procedure details 502 */ 503 public String toStringDetails() { 504 final StringBuilder sb = toStringSimpleSB(); 505 506 sb.append(" submittedTime="); 507 sb.append(getSubmittedTime()); 508 509 sb.append(", lastUpdate="); 510 sb.append(getLastUpdate()); 511 512 final int[] stackIndices = getStackIndexes(); 513 if (stackIndices != null) { 514 sb.append("\n"); 515 sb.append("stackIndexes="); 516 sb.append(Arrays.toString(stackIndices)); 517 } 518 519 return sb.toString(); 520 } 521 522 protected String toStringClass() { 523 StringBuilder sb = new StringBuilder(); 524 toStringClassDetails(sb); 525 return sb.toString(); 526 } 527 528 /** 529 * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating 530 * generic Procedure State with Procedure particulars. 531 * @param builder Append current {@link ProcedureState} 532 */ 533 protected void toStringState(StringBuilder builder) { 534 builder.append(getState()); 535 } 536 537 /** 538 * Extend the toString() information with the procedure details 539 * e.g. className and parameters 540 * @param builder the string builder to use to append the proc specific information 541 */ 542 protected void toStringClassDetails(StringBuilder builder) { 543 builder.append(getClass().getName()); 544 } 545 546 // ========================================================================== 547 // Those fields are unchanged after initialization. 548 // 549 // Each procedure will get created from the user or during 550 // ProcedureExecutor.start() during the load() phase and then submitted 551 // to the executor. these fields will never be changed after initialization 552 // ========================================================================== 553 public long getProcId() { 554 return procId; 555 } 556 557 public boolean hasParent() { 558 return parentProcId != NO_PROC_ID; 559 } 560 561 public long getParentProcId() { 562 return parentProcId; 563 } 564 565 public long getRootProcId() { 566 return rootProcId; 567 } 568 569 public String getProcName() { 570 return toStringClass(); 571 } 572 573 public NonceKey getNonceKey() { 574 return nonceKey; 575 } 576 577 public long getSubmittedTime() { 578 return submittedTime; 579 } 580 581 public String getOwner() { 582 return owner; 583 } 584 585 public boolean hasOwner() { 586 return owner != null; 587 } 588 589 /** 590 * Called by the ProcedureExecutor to assign the ID to the newly created procedure. 591 */ 592 protected void setProcId(long procId) { 593 this.procId = procId; 594 this.submittedTime = EnvironmentEdgeManager.currentTime(); 595 setState(ProcedureState.RUNNABLE); 596 } 597 598 /** 599 * Called by the ProcedureExecutor to assign the parent to the newly created procedure. 600 */ 601 protected void setParentProcId(long parentProcId) { 602 this.parentProcId = parentProcId; 603 } 604 605 protected void setRootProcId(long rootProcId) { 606 this.rootProcId = rootProcId; 607 } 608 609 /** 610 * Called by the ProcedureExecutor to set the value to the newly created procedure. 611 */ 612 protected void setNonceKey(NonceKey nonceKey) { 613 this.nonceKey = nonceKey; 614 } 615 616 public void setOwner(String owner) { 617 this.owner = StringUtils.isEmpty(owner) ? null : owner; 618 } 619 620 public void setOwner(User owner) { 621 assert owner != null : "expected owner to be not null"; 622 setOwner(owner.getShortName()); 623 } 624 625 /** 626 * Called on store load to initialize the Procedure internals after 627 * the creation/deserialization. 628 */ 629 protected void setSubmittedTime(long submittedTime) { 630 this.submittedTime = submittedTime; 631 } 632 633 // ========================================================================== 634 // runtime state - timeout related 635 // ========================================================================== 636 /** 637 * @param timeout timeout interval in msec 638 */ 639 protected void setTimeout(int timeout) { 640 this.timeout = timeout; 641 } 642 643 public boolean hasTimeout() { 644 return timeout != NO_TIMEOUT; 645 } 646 647 /** 648 * @return the timeout in msec 649 */ 650 public int getTimeout() { 651 return timeout; 652 } 653 654 /** 655 * Called on store load to initialize the Procedure internals after 656 * the creation/deserialization. 657 */ 658 protected void setLastUpdate(long lastUpdate) { 659 this.lastUpdate = lastUpdate; 660 } 661 662 /** 663 * Called by ProcedureExecutor after each time a procedure step is executed. 664 */ 665 protected void updateTimestamp() { 666 this.lastUpdate = EnvironmentEdgeManager.currentTime(); 667 } 668 669 public long getLastUpdate() { 670 return lastUpdate; 671 } 672 673 /** 674 * Timeout of the next timeout. 675 * Called by the ProcedureExecutor if the procedure has timeout set and 676 * the procedure is in the waiting queue. 677 * @return the timestamp of the next timeout. 678 */ 679 protected long getTimeoutTimestamp() { 680 return getLastUpdate() + getTimeout(); 681 } 682 683 // ========================================================================== 684 // runtime state 685 // ========================================================================== 686 /** 687 * @return the time elapsed between the last update and the start time of the procedure. 688 */ 689 public long elapsedTime() { 690 return getLastUpdate() - getSubmittedTime(); 691 } 692 693 /** 694 * @return the serialized result if any, otherwise null 695 */ 696 public byte[] getResult() { 697 return result; 698 } 699 700 /** 701 * The procedure may leave a "result" on completion. 702 * @param result the serialized result that will be passed to the client 703 */ 704 protected void setResult(byte[] result) { 705 this.result = result; 706 } 707 708 /** 709 * Will only be called when loading procedures from procedure store, where we need to record 710 * whether the procedure has already held a lock. Later we will call 711 * {@link #restoreLock(Object)} to actually acquire the lock. 712 */ 713 final void lockedWhenLoading() { 714 this.lockedWhenLoading = true; 715 } 716 717 /** 718 * Can only be called when restarting, before the procedure actually being executed, as after we 719 * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset 720 * {@link #lockedWhenLoading} to false. 721 * <p/> 722 * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in 723 * front of a queue. 724 */ 725 public boolean isLockedWhenLoading() { 726 return lockedWhenLoading; 727 } 728 729 // ============================================================================================== 730 // Runtime state, updated every operation by the ProcedureExecutor 731 // 732 // There is always 1 thread at the time operating on the state of the procedure. 733 // The ProcedureExecutor may check and set states, or some Procecedure may 734 // update its own state. but no concurrent updates. we use synchronized here 735 // just because the procedure can get scheduled on different executor threads on each step. 736 // ============================================================================================== 737 738 /** 739 * @return true if the procedure is in a RUNNABLE state. 740 */ 741 public synchronized boolean isRunnable() { 742 return state == ProcedureState.RUNNABLE; 743 } 744 745 public synchronized boolean isInitializing() { 746 return state == ProcedureState.INITIALIZING; 747 } 748 749 /** 750 * @return true if the procedure has failed. It may or may not have rolled back. 751 */ 752 public synchronized boolean isFailed() { 753 return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK; 754 } 755 756 /** 757 * @return true if the procedure is finished successfully. 758 */ 759 public synchronized boolean isSuccess() { 760 return state == ProcedureState.SUCCESS && !hasException(); 761 } 762 763 /** 764 * @return true if the procedure is finished. The Procedure may be completed successfully or 765 * rolledback. 766 */ 767 public synchronized boolean isFinished() { 768 return isSuccess() || state == ProcedureState.ROLLEDBACK; 769 } 770 771 /** 772 * @return true if the procedure is waiting for a child to finish or for an external event. 773 */ 774 public synchronized boolean isWaiting() { 775 switch (state) { 776 case WAITING: 777 case WAITING_TIMEOUT: 778 return true; 779 default: 780 break; 781 } 782 return false; 783 } 784 785 protected synchronized void setState(final ProcedureState state) { 786 this.state = state; 787 updateTimestamp(); 788 } 789 790 public synchronized ProcedureState getState() { 791 return state; 792 } 793 794 protected void setFailure(final String source, final Throwable cause) { 795 setFailure(new RemoteProcedureException(source, cause)); 796 } 797 798 protected synchronized void setFailure(final RemoteProcedureException exception) { 799 this.exception = exception; 800 if (!isFinished()) { 801 setState(ProcedureState.FAILED); 802 } 803 } 804 805 protected void setAbortFailure(final String source, final String msg) { 806 setFailure(source, new ProcedureAbortedException(msg)); 807 } 808 809 /** 810 * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired. 811 * <p/> 812 * Another usage for this method is to implement retrying. A procedure can set the state to 813 * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a 814 * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a 815 * call {@link #setTimeout(int)} method to set the timeout. And you should also override this 816 * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the 817 * timeout event has been handled. 818 * @return true to let the framework handle the timeout as abort, false in case the procedure 819 * handled the timeout itself. 820 */ 821 protected synchronized boolean setTimeoutFailure(TEnvironment env) { 822 if (state == ProcedureState.WAITING_TIMEOUT) { 823 long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate; 824 setFailure("ProcedureExecutor", 825 new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff))); 826 return true; 827 } 828 return false; 829 } 830 831 public synchronized boolean hasException() { 832 return exception != null; 833 } 834 835 public synchronized RemoteProcedureException getException() { 836 return exception; 837 } 838 839 /** 840 * Called by the ProcedureExecutor on procedure-load to restore the latch state 841 */ 842 protected synchronized void setChildrenLatch(int numChildren) { 843 this.childrenLatch = numChildren; 844 if (LOG.isTraceEnabled()) { 845 LOG.trace("CHILD LATCH INCREMENT SET " + 846 this.childrenLatch, new Throwable(this.toString())); 847 } 848 } 849 850 /** 851 * Called by the ProcedureExecutor on procedure-load to restore the latch state 852 */ 853 protected synchronized void incChildrenLatch() { 854 // TODO: can this be inferred from the stack? I think so... 855 this.childrenLatch++; 856 if (LOG.isTraceEnabled()) { 857 LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString())); 858 } 859 } 860 861 /** 862 * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. 863 */ 864 private synchronized boolean childrenCountDown() { 865 assert childrenLatch > 0: this; 866 boolean b = --childrenLatch == 0; 867 if (LOG.isTraceEnabled()) { 868 LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString())); 869 } 870 return b; 871 } 872 873 /** 874 * Try to set this procedure into RUNNABLE state. 875 * Succeeds if all subprocedures/children are done. 876 * @return True if we were able to move procedure to RUNNABLE state. 877 */ 878 synchronized boolean tryRunnable() { 879 // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT 880 if (getState() == ProcedureState.WAITING && childrenCountDown()) { 881 setState(ProcedureState.RUNNABLE); 882 return true; 883 } else { 884 return false; 885 } 886 } 887 888 protected synchronized boolean hasChildren() { 889 return childrenLatch > 0; 890 } 891 892 protected synchronized int getChildrenLatch() { 893 return childrenLatch; 894 } 895 896 /** 897 * Called by the RootProcedureState on procedure execution. 898 * Each procedure store its stack-index positions. 899 */ 900 protected synchronized void addStackIndex(final int index) { 901 if (stackIndexes == null) { 902 stackIndexes = new int[] { index }; 903 } else { 904 int count = stackIndexes.length; 905 stackIndexes = Arrays.copyOf(stackIndexes, count + 1); 906 stackIndexes[count] = index; 907 } 908 } 909 910 protected synchronized boolean removeStackIndex() { 911 if (stackIndexes != null && stackIndexes.length > 1) { 912 stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1); 913 return false; 914 } else { 915 stackIndexes = null; 916 return true; 917 } 918 } 919 920 /** 921 * Called on store load to initialize the Procedure internals after 922 * the creation/deserialization. 923 */ 924 protected synchronized void setStackIndexes(final List<Integer> stackIndexes) { 925 this.stackIndexes = new int[stackIndexes.size()]; 926 for (int i = 0; i < this.stackIndexes.length; ++i) { 927 this.stackIndexes[i] = stackIndexes.get(i); 928 } 929 } 930 931 protected synchronized boolean wasExecuted() { 932 return stackIndexes != null; 933 } 934 935 protected synchronized int[] getStackIndexes() { 936 return stackIndexes; 937 } 938 939 // ========================================================================== 940 // Internal methods - called by the ProcedureExecutor 941 // ========================================================================== 942 943 /** 944 * Internal method called by the ProcedureExecutor that starts the user-level code execute(). 945 * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and 946 * skip out without changing states or releasing any locks held. 947 */ 948 protected Procedure<TEnvironment>[] doExecute(TEnvironment env) 949 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 950 try { 951 updateTimestamp(); 952 if (bypass) { 953 LOG.info("{} bypassed, returning null to finish it", this); 954 return null; 955 } 956 return execute(env); 957 } finally { 958 updateTimestamp(); 959 } 960 } 961 962 /** 963 * Internal method called by the ProcedureExecutor that starts the user-level code rollback(). 964 */ 965 protected void doRollback(TEnvironment env) 966 throws IOException, InterruptedException { 967 try { 968 updateTimestamp(); 969 if (bypass) { 970 LOG.info("{} bypassed, skipping rollback", this); 971 return; 972 } 973 rollback(env); 974 } finally { 975 updateTimestamp(); 976 } 977 } 978 979 final void restoreLock(TEnvironment env) { 980 if (!lockedWhenLoading) { 981 LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this); 982 return; 983 } 984 985 if (isFinished()) { 986 LOG.debug("{} is already finished, skip acquiring lock.", this); 987 return; 988 } 989 990 if (isBypass()) { 991 LOG.debug("{} is already bypassed, skip acquiring lock.", this); 992 return; 993 } 994 // this can happen if the parent stores the sub procedures but before it can 995 // release its lock, the master restarts 996 if (getState() == ProcedureState.WAITING && !holdLock(env)) { 997 LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this); 998 lockedWhenLoading = false; 999 return; 1000 } 1001 LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this); 1002 LockState state = acquireLock(env); 1003 assert state == LockState.LOCK_ACQUIRED; 1004 } 1005 1006 /** 1007 * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock(). 1008 */ 1009 final LockState doAcquireLock(TEnvironment env, ProcedureStore store) { 1010 if (waitInitialized(env)) { 1011 return LockState.LOCK_EVENT_WAIT; 1012 } 1013 if (lockedWhenLoading) { 1014 // reset it so we will not consider it anymore 1015 lockedWhenLoading = false; 1016 locked = true; 1017 // Here we return without persist the locked state, as lockedWhenLoading is true means 1018 // that the locked field of the procedure stored in procedure store is true, so we do not need 1019 // to store it again. 1020 return LockState.LOCK_ACQUIRED; 1021 } 1022 LockState state = acquireLock(env); 1023 if (state == LockState.LOCK_ACQUIRED) { 1024 locked = true; 1025 // persist that we have held the lock. This must be done before we actually execute the 1026 // procedure, otherwise when restarting, we may consider the procedure does not have a lock, 1027 // but it may have already done some changes as we have already executed it, and if another 1028 // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do 1029 // not expect that another procedure can be executed in the middle. 1030 store.update(this); 1031 } 1032 return state; 1033 } 1034 1035 /** 1036 * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock(). 1037 */ 1038 final void doReleaseLock(TEnvironment env, ProcedureStore store) { 1039 locked = false; 1040 // persist that we have released the lock. This must be done before we actually release the 1041 // lock. Another procedure may take this lock immediately after we release the lock, and if we 1042 // crash before persist the information that we have already released the lock, then when 1043 // restarting there will be two procedures which both have the lock and cause problems. 1044 if (getState() != ProcedureState.ROLLEDBACK) { 1045 // If the state is ROLLEDBACK, it means that we have already deleted the procedure from 1046 // procedure store, so do not need to log the release operation any more. 1047 store.update(this); 1048 } 1049 releaseLock(env); 1050 } 1051 1052 @Override 1053 public int compareTo(final Procedure<TEnvironment> other) { 1054 return Long.compare(getProcId(), other.getProcId()); 1055 } 1056 1057 // ========================================================================== 1058 // misc utils 1059 // ========================================================================== 1060 1061 /** 1062 * Get an hashcode for the specified Procedure ID 1063 * @return the hashcode for the specified procId 1064 */ 1065 public static long getProcIdHashCode(long procId) { 1066 long h = procId; 1067 h ^= h >> 16; 1068 h *= 0x85ebca6b; 1069 h ^= h >> 13; 1070 h *= 0xc2b2ae35; 1071 h ^= h >> 16; 1072 return h; 1073 } 1074 1075 /** 1076 * Helper to lookup the root Procedure ID given a specified procedure. 1077 */ 1078 protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures, 1079 Procedure<T> proc) { 1080 while (proc.hasParent()) { 1081 proc = procedures.get(proc.getParentProcId()); 1082 if (proc == null) { 1083 return null; 1084 } 1085 } 1086 return proc.getProcId(); 1087 } 1088 1089 /** 1090 * @param a the first procedure to be compared. 1091 * @param b the second procedure to be compared. 1092 * @return true if the two procedures have the same parent 1093 */ 1094 public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) { 1095 return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId()); 1096 } 1097}