001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.ThreadLocalRandom; 025import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 026import org.apache.hadoop.hbase.metrics.Counter; 027import org.apache.hadoop.hbase.metrics.Histogram; 028import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 029import org.apache.hadoop.hbase.procedure2.util.StringUtils; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 032import org.apache.hadoop.hbase.util.NonceKey; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 039 040/** 041 * Base Procedure class responsible for Procedure Metadata; e.g. state, submittedTime, lastUpdate, 042 * stack-indexes, etc. 043 * <p/> 044 * Procedures are run by a {@link ProcedureExecutor} instance. They are submitted and then the 045 * ProcedureExecutor keeps calling {@link #execute(Object)} until the Procedure is done. Execute may 046 * be called multiple times in the case of failure or a restart, so code must be idempotent. The 047 * return from an execute call is either: null to indicate we are done; ourself if there is more to 048 * do; or, a set of sub-procedures that need to be run to completion before the framework resumes 049 * our execution. 050 * <p/> 051 * The ProcedureExecutor keeps its notion of Procedure State in the Procedure itself; e.g. it stamps 052 * the Procedure as INITIALIZING, RUNNABLE, SUCCESS, etc. Here are some of the States defined in the 053 * ProcedureState enum from protos: 054 * <ul> 055 * <li>{@link #isFailed()} A procedure has executed at least once and has failed. The procedure may 056 * or may not have rolled back yet. Any procedure in FAILED state will be eventually moved to 057 * ROLLEDBACK state.</li> 058 * <li>{@link #isSuccess()} A procedure is completed successfully without exception.</li> 059 * <li>{@link #isFinished()} As a procedure in FAILED state will be tried forever for rollback, only 060 * condition when scheduler/ executor will drop procedure from further processing is when procedure 061 * state is ROLLEDBACK or isSuccess() returns true. This is a terminal state of the procedure.</li> 062 * <li>{@link #isWaiting()} - Procedure is in one of the two waiting states 063 * ({@link ProcedureState#WAITING}, {@link ProcedureState#WAITING_TIMEOUT}).</li> 064 * </ul> 065 * NOTE: These states are of the ProcedureExecutor. Procedure implementations in turn can keep their 066 * own state. This can lead to confusion. Try to keep the two distinct. 067 * <p/> 068 * rollback() is called when the procedure or one of the sub-procedures has failed. The rollback 069 * step is supposed to cleanup the resources created during the execute() step. In case of failure 070 * and restart, rollback() may be called multiple times, so again the code must be idempotent. 071 * <p/> 072 * Procedure can be made respect a locking regime. It has acquire/release methods as well as an 073 * {@link #hasLock()}. The lock implementation is up to the implementor. If an entity needs to be 074 * locked for the life of a procedure -- not just the calls to execute -- then implementations 075 * should say so with the {@link #holdLock(Object)} method. 076 * <p/> 077 * And since we need to restore the lock when restarting to keep the logic correct(HBASE-20846), the 078 * implementation is a bit tricky so we add some comments hrre about it. 079 * <ul> 080 * <li>Make {@link #hasLock()} method final, and add a {@link #locked} field in Procedure to record 081 * whether we have the lock. We will set it to {@code true} in 082 * {@link #doAcquireLock(Object, ProcedureStore)} and to {@code false} in 083 * {@link #doReleaseLock(Object, ProcedureStore)}. The sub classes do not need to manage it any 084 * more.</li> 085 * <li>Also added a locked field in the proto message. When storing, the field will be set according 086 * to the return value of {@link #hasLock()}. And when loading, there is a new field in Procedure 087 * called {@link #lockedWhenLoading}. We will set it to {@code true} if the locked field in proto 088 * message is {@code true}.</li> 089 * <li>The reason why we can not set the {@link #locked} field directly to {@code true} by calling 090 * {@link #doAcquireLock(Object, ProcedureStore)} is that, during initialization, most procedures 091 * need to wait until master is initialized. So the solution here is that, we introduced a new 092 * method called {@link #waitInitialized(Object)} in Procedure, and move the wait master initialized 093 * related code from {@link #acquireLock(Object)} to this method. And we added a restoreLock method 094 * to Procedure, if {@link #lockedWhenLoading} is {@code true}, we will call the 095 * {@link #acquireLock(Object)} to get the lock, but do not set {@link #locked} to true. And later 096 * when we call {@link #doAcquireLock(Object, ProcedureStore)} and pass the 097 * {@link #waitInitialized(Object)} check, we will test {@link #lockedWhenLoading}, if it is 098 * {@code true}, when we just set the {@link #locked} field to true and return, without actually 099 * calling the {@link #acquireLock(Object)} method since we have already called it once.</li> 100 * </ul> 101 * <p/> 102 * Procedures can be suspended or put in wait state with a callback that gets executed on 103 * Procedure-specified timeout. See {@link #setTimeout(int)}}, and 104 * {@link #setTimeoutFailure(Object)}. See TestProcedureEvents and the TestTimeoutEventProcedure 105 * class for an example usage. 106 * </p> 107 * <p/> 108 * There are hooks for collecting metrics on submit of the procedure and on finish. See 109 * {@link #updateMetricsOnSubmit(Object)} and {@link #updateMetricsOnFinish(Object, long, boolean)}. 110 */ 111@InterfaceAudience.Private 112public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TEnvironment>> { 113 private static final Logger LOG = LoggerFactory.getLogger(Procedure.class); 114 public static final long NO_PROC_ID = -1; 115 protected static final int NO_TIMEOUT = -1; 116 117 public enum LockState { 118 LOCK_ACQUIRED, // Lock acquired and ready to execute 119 LOCK_YIELD_WAIT, // Lock not acquired, framework needs to yield 120 LOCK_EVENT_WAIT, // Lock not acquired, an event will yield the procedure 121 } 122 123 // Unchanged after initialization 124 private NonceKey nonceKey = null; 125 private String owner = null; 126 private long parentProcId = NO_PROC_ID; 127 private long rootProcId = NO_PROC_ID; 128 private long procId = NO_PROC_ID; 129 private long submittedTime; 130 private boolean isCriticalSystemTable; 131 132 // Runtime state, updated every operation 133 private ProcedureState state = ProcedureState.INITIALIZING; 134 private RemoteProcedureException exception = null; 135 private int[] stackIndexes = null; 136 private int childrenLatch = 0; 137 // since we do not always maintain stackIndexes if the root procedure does not support rollback, 138 // we need a separated flag to indicate whether a procedure was executed 139 private boolean wasExecuted; 140 141 private volatile int timeout = NO_TIMEOUT; 142 private volatile long lastUpdate; 143 144 private volatile byte[] result = null; 145 146 private volatile boolean locked = false; 147 148 private boolean lockedWhenLoading = false; 149 150 /** 151 * Used for override complete of the procedure without actually doing any logic in the procedure. 152 * If bypass is set to true, when executing it will return null when {@link #doExecute(Object)} is 153 * called to finish the procedure and release any locks it may currently hold. The bypass does 154 * cleanup around the Procedure as far as the Procedure framework is concerned. It does not clean 155 * any internal state that the Procedure's themselves may have set. That is for the Procedures to 156 * do themselves when bypass is called. They should override bypass and do their cleanup in the 157 * overridden bypass method (be sure to call the parent bypass to ensure proper processing). 158 * <p> 159 * </p> 160 * Bypassing a procedure is not like aborting. Aborting a procedure will trigger a rollback. And 161 * since the {@link #abort(Object)} method is overrideable Some procedures may have chosen to 162 * ignore the aborting. 163 */ 164 private volatile boolean bypass = false; 165 166 /** 167 * Indicate whether we need to persist the procedure to ProcedureStore after execution. Default to 168 * true, and the implementation can all {@link #skipPersistence()} to let the framework skip the 169 * persistence of the procedure. 170 * <p/> 171 * This is useful when the procedure is in error and you want to retry later. The retry interval 172 * and the number of retries are usually not critical so skip the persistence can save some 173 * resources, and also speed up the restart processing. 174 * <p/> 175 * Notice that this value will be reset to true every time before execution. And when rolling back 176 * we do not test this value. 177 */ 178 private boolean persist = true; 179 180 public boolean isBypass() { 181 return bypass; 182 } 183 184 /** 185 * Set the bypass to true. Only called in 186 * {@link ProcedureExecutor#bypassProcedure(long, long, boolean, boolean)} for now. DO NOT use 187 * this method alone, since we can't just bypass one single procedure. We need to bypass its 188 * ancestor too. If your Procedure has set state, it needs to undo it in here. 189 * @param env Current environment. May be null because of context; e.g. pretty-printing procedure 190 * WALs where there is no 'environment' (and where Procedures that require an 191 * 'environment' won't be run. 192 */ 193 protected void bypass(TEnvironment env) { 194 this.bypass = true; 195 } 196 197 boolean needPersistence() { 198 return persist; 199 } 200 201 void resetPersistence() { 202 persist = true; 203 } 204 205 protected final void skipPersistence() { 206 persist = false; 207 } 208 209 /** 210 * The main code of the procedure. It must be idempotent since execute() may be called multiple 211 * times in case of machine failure in the middle of the execution. 212 * @param env the environment passed to the ProcedureExecutor 213 * @return a set of sub-procedures to run or ourselves if there is more work to do or null if the 214 * procedure is done. 215 * @throws ProcedureYieldException the procedure will be added back to the queue and retried 216 * later. 217 * @throws InterruptedException the procedure will be added back to the queue and retried 218 * later. 219 * @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself 220 * and has set itself up waiting for an external event to wake 221 * it back up again. 222 */ 223 protected abstract Procedure<TEnvironment>[] execute(TEnvironment env) 224 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException; 225 226 /** 227 * The code to undo what was done by the execute() code. It is called when the procedure or one of 228 * the sub-procedures failed or an abort was requested. It should cleanup all the resources 229 * created by the execute() call. The implementation must be idempotent since rollback() may be 230 * called multiple time in case of machine failure in the middle of the execution. 231 * @param env the environment passed to the ProcedureExecutor 232 * @throws IOException temporary failure, the rollback will retry later 233 * @throws InterruptedException the procedure will be added back to the queue and retried later 234 */ 235 protected abstract void rollback(TEnvironment env) throws IOException, InterruptedException; 236 237 /** 238 * The abort() call is asynchronous and each procedure must decide how to deal with it, if they 239 * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the 240 * abort() method and then the execute() will check if the abort flag is set or not. abort() may 241 * be called multiple times from the client, so the implementation must be idempotent. 242 * <p> 243 * NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the 244 * procedure implementor abort. 245 */ 246 protected abstract boolean abort(TEnvironment env); 247 248 /** 249 * The user-level code of the procedure may have some state to persist (e.g. input arguments or 250 * current position in the processing state) to be able to resume on failure. 251 * @param serializer stores the serializable state 252 */ 253 protected abstract void serializeStateData(ProcedureStateSerializer serializer) 254 throws IOException; 255 256 /** 257 * Called on store load to allow the user to decode the previously serialized state. 258 * @param serializer contains the serialized state 259 */ 260 protected abstract void deserializeStateData(ProcedureStateSerializer serializer) 261 throws IOException; 262 263 /** 264 * The {@link #doAcquireLock(Object, ProcedureStore)} will be split into two steps, first, it will 265 * call us to determine whether we need to wait for initialization, second, it will call 266 * {@link #acquireLock(Object)} to actually handle the lock for this procedure. 267 * <p/> 268 * This is because that when master restarts, we need to restore the lock state for all the 269 * procedures to not break the semantic if {@link #holdLock(Object)} is true. But the 270 * {@link ProcedureExecutor} will be started before the master finish initialization(as it is part 271 * of the initialization!), so we need to split the code into two steps, and when restore, we just 272 * restore the lock part and ignore the waitInitialized part. Otherwise there will be dead lock. 273 * @return true means we need to wait until the environment has been initialized, otherwise true. 274 */ 275 protected boolean waitInitialized(TEnvironment env) { 276 return false; 277 } 278 279 /** 280 * The user should override this method if they need a lock on an Entity. A lock can be anything, 281 * and it is up to the implementor. The Procedure Framework will call this method just before it 282 * invokes {@link #execute(Object)}. It calls {@link #releaseLock(Object)} after the call to 283 * execute. 284 * <p/> 285 * If you need to hold the lock for the life of the Procedure -- i.e. you do not want any other 286 * Procedure interfering while this Procedure is running, see {@link #holdLock(Object)}. 287 * <p/> 288 * Example: in our Master we can execute request in parallel for different tables. We can create 289 * t1 and create t2 and these creates can be executed at the same time. Anything else on t1/t2 is 290 * queued waiting that specific table create to happen. 291 * <p/> 292 * There are 3 LockState: 293 * <ul> 294 * <li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to 295 * execute.</li> 296 * <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework should 297 * take care of readding the procedure back to the runnable set for retry</li> 298 * <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take 299 * care of readding the procedure back to the runnable set when the lock is available.</li> 300 * </ul> 301 * @return the lock state as described above. 302 */ 303 protected LockState acquireLock(TEnvironment env) { 304 return LockState.LOCK_ACQUIRED; 305 } 306 307 /** 308 * The user should override this method, and release lock if necessary. 309 */ 310 protected void releaseLock(TEnvironment env) { 311 // no-op 312 } 313 314 /** 315 * Used to keep the procedure lock even when the procedure is yielding or suspended. 316 * @return true if the procedure should hold on the lock until completionCleanup() 317 */ 318 protected boolean holdLock(TEnvironment env) { 319 return false; 320 } 321 322 /** 323 * This is used in conjunction with {@link #holdLock(Object)}. If {@link #holdLock(Object)} 324 * returns true, the procedure executor will call acquireLock() once and thereafter not call 325 * {@link #releaseLock(Object)} until the Procedure is done (Normally, it calls release/acquire 326 * around each invocation of {@link #execute(Object)}. 327 * @see #holdLock(Object) 328 * @return true if the procedure has the lock, false otherwise. 329 */ 330 public final boolean hasLock() { 331 return locked; 332 } 333 334 /** 335 * Called when the procedure is loaded for replay. The procedure implementor may use this method 336 * to perform some quick operation before replay. e.g. failing the procedure if the state on 337 * replay may be unknown. 338 */ 339 protected void beforeReplay(TEnvironment env) { 340 // no-op 341 } 342 343 /** 344 * Called when the procedure is ready to be added to the queue after the loading/replay operation. 345 */ 346 protected void afterReplay(TEnvironment env) { 347 // no-op 348 } 349 350 /** 351 * Called before we call the execute method of this procedure, but after we acquire the execution 352 * lock and procedure scheduler lock. 353 */ 354 protected void beforeExec(TEnvironment env) throws ProcedureSuspendedException { 355 // no-op 356 } 357 358 /** 359 * Called after we call the execute method of this procedure, and also after we initialize all the 360 * sub procedures and persist the the state if persistence is needed. 361 * <p> 362 * This is for doing some hooks after we initialize the sub procedures. See HBASE-29259 for more 363 * details on why we can not release the region lock inside the execute method. 364 */ 365 protected void afterExec(TEnvironment env) { 366 // no-op 367 } 368 369 /** 370 * Called when the procedure is marked as completed (success or rollback). The procedure 371 * implementor may use this method to cleanup in-memory states. This operation will not be retried 372 * on failure. If a procedure took a lock, it will have been released when this method runs. 373 */ 374 protected void completionCleanup(TEnvironment env) { 375 // no-op 376 } 377 378 /** 379 * By default, the procedure framework/executor will try to run procedures start to finish. Return 380 * true to make the executor yield between each execution step to give other procedures a chance 381 * to run. 382 * @param env the environment passed to the ProcedureExecutor 383 * @return Return true if the executor should yield on completion of an execution step. Defaults 384 * to return false. 385 */ 386 protected boolean isYieldAfterExecutionStep(TEnvironment env) { 387 return false; 388 } 389 390 /** 391 * By default, the executor will keep the procedure result around util the eviction TTL is 392 * expired. The client can cut down the waiting time by requesting that the result is removed from 393 * the executor. In case of system started procedure, we can force the executor to auto-ack. 394 * @param env the environment passed to the ProcedureExecutor 395 * @return true if the executor should wait the client ack for the result. Defaults to return 396 * true. 397 */ 398 protected boolean shouldWaitClientAck(TEnvironment env) { 399 return true; 400 } 401 402 /** 403 * Override this method to provide procedure specific counters for submitted count, failed count 404 * and time histogram. 405 * @param env The environment passed to the procedure executor 406 * @return Container object for procedure related metric 407 */ 408 protected ProcedureMetrics getProcedureMetrics(TEnvironment env) { 409 return null; 410 } 411 412 /** 413 * This function will be called just when procedure is submitted for execution. Override this 414 * method to update the metrics at the beginning of the procedure. The default implementation 415 * updates submitted counter if {@link #getProcedureMetrics(Object)} returns non-null 416 * {@link ProcedureMetrics}. 417 */ 418 protected void updateMetricsOnSubmit(TEnvironment env) { 419 ProcedureMetrics metrics = getProcedureMetrics(env); 420 if (metrics == null) { 421 return; 422 } 423 424 Counter submittedCounter = metrics.getSubmittedCounter(); 425 if (submittedCounter != null) { 426 submittedCounter.increment(); 427 } 428 } 429 430 /** 431 * This function will be called just after procedure execution is finished. Override this method 432 * to update metrics at the end of the procedure. If {@link #getProcedureMetrics(Object)} returns 433 * non-null {@link ProcedureMetrics}, the default implementation adds runtime of a procedure to a 434 * time histogram for successfully completed procedures. Increments failed counter for failed 435 * procedures. 436 * <p/> 437 * TODO: As any of the sub-procedures on failure rolls back all procedures in the stack, including 438 * successfully finished siblings, this function may get called twice in certain cases for certain 439 * procedures. Explore further if this can be called once. 440 * @param env The environment passed to the procedure executor 441 * @param runtime Runtime of the procedure in milliseconds 442 * @param success true if procedure is completed successfully 443 */ 444 protected void updateMetricsOnFinish(TEnvironment env, long runtime, boolean success) { 445 ProcedureMetrics metrics = getProcedureMetrics(env); 446 if (metrics == null) { 447 return; 448 } 449 450 if (success) { 451 Histogram timeHisto = metrics.getTimeHisto(); 452 if (timeHisto != null) { 453 timeHisto.update(runtime); 454 } 455 } else { 456 Counter failedCounter = metrics.getFailedCounter(); 457 if (failedCounter != null) { 458 failedCounter.increment(); 459 } 460 } 461 } 462 463 @Override 464 public String toString() { 465 // Return the simple String presentation of the procedure. 466 return toStringSimpleSB().toString(); 467 } 468 469 /** 470 * Build the StringBuilder for the simple form of procedure string. 471 * @return the StringBuilder 472 */ 473 protected StringBuilder toStringSimpleSB() { 474 final StringBuilder sb = new StringBuilder(); 475 476 sb.append("pid="); 477 sb.append(getProcId()); 478 479 if (hasParent()) { 480 sb.append(", ppid="); 481 sb.append(getParentProcId()); 482 } 483 484 /* 485 * TODO Enable later when this is being used. Currently owner not used. if (hasOwner()) { 486 * sb.append(", owner="); sb.append(getOwner()); } 487 */ 488 489 sb.append(", state="); // pState for Procedure State as opposed to any other kind. 490 toStringState(sb); 491 492 sb.append(", hasLock=").append(locked); 493 494 if (bypass) { 495 sb.append(", bypass=").append(bypass); 496 } 497 498 if (hasException()) { 499 sb.append(", exception=" + getException()); 500 } 501 502 sb.append("; "); 503 toStringClassDetails(sb); 504 505 return sb; 506 } 507 508 /** 509 * Extend the toString() information with more procedure details 510 */ 511 public String toStringDetails() { 512 final StringBuilder sb = toStringSimpleSB(); 513 514 sb.append(" submittedTime="); 515 sb.append(getSubmittedTime()); 516 517 sb.append(", lastUpdate="); 518 sb.append(getLastUpdate()); 519 520 final int[] stackIndices = getStackIndexes(); 521 if (stackIndices != null) { 522 sb.append("\n"); 523 sb.append("stackIndexes="); 524 sb.append(Arrays.toString(stackIndices)); 525 } 526 527 return sb.toString(); 528 } 529 530 protected String toStringClass() { 531 StringBuilder sb = new StringBuilder(); 532 toStringClassDetails(sb); 533 return sb.toString(); 534 } 535 536 /** 537 * Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating 538 * generic Procedure State with Procedure particulars. 539 * @param builder Append current {@link ProcedureState} 540 */ 541 protected void toStringState(StringBuilder builder) { 542 builder.append(getState()); 543 } 544 545 /** 546 * Extend the toString() information with the procedure details e.g. className and parameters 547 * @param builder the string builder to use to append the proc specific information 548 */ 549 protected void toStringClassDetails(StringBuilder builder) { 550 builder.append(getClass().getName()); 551 } 552 553 // ========================================================================== 554 // Those fields are unchanged after initialization. 555 // 556 // Each procedure will get created from the user or during 557 // ProcedureExecutor.start() during the load() phase and then submitted 558 // to the executor. these fields will never be changed after initialization 559 // ========================================================================== 560 public long getProcId() { 561 return procId; 562 } 563 564 public boolean hasParent() { 565 return parentProcId != NO_PROC_ID; 566 } 567 568 public long getParentProcId() { 569 return parentProcId; 570 } 571 572 public long getRootProcId() { 573 return rootProcId; 574 } 575 576 public String getProcName() { 577 return toStringClass(); 578 } 579 580 public NonceKey getNonceKey() { 581 return nonceKey; 582 } 583 584 public long getSubmittedTime() { 585 return submittedTime; 586 } 587 588 public String getOwner() { 589 return owner; 590 } 591 592 public boolean hasOwner() { 593 return owner != null; 594 } 595 596 /** 597 * Called by the ProcedureExecutor to assign the ID to the newly created procedure. 598 */ 599 protected void setProcId(long procId) { 600 this.procId = procId; 601 this.submittedTime = EnvironmentEdgeManager.currentTime(); 602 setState(ProcedureState.RUNNABLE); 603 } 604 605 /** 606 * Called by the ProcedureExecutor to assign the parent to the newly created procedure. 607 */ 608 protected void setParentProcId(long parentProcId) { 609 this.parentProcId = parentProcId; 610 } 611 612 public void setCriticalSystemTable(boolean isCriticalSystemTable) { 613 this.isCriticalSystemTable = isCriticalSystemTable; 614 } 615 616 public boolean isCriticalSystemTable() { 617 return isCriticalSystemTable; 618 } 619 620 protected void setRootProcId(long rootProcId) { 621 this.rootProcId = rootProcId; 622 } 623 624 /** 625 * Called by the ProcedureExecutor to set the value to the newly created procedure. 626 */ 627 protected void setNonceKey(NonceKey nonceKey) { 628 this.nonceKey = nonceKey; 629 } 630 631 public void setOwner(String owner) { 632 this.owner = StringUtils.isEmpty(owner) ? null : owner; 633 } 634 635 public void setOwner(User owner) { 636 assert owner != null : "expected owner to be not null"; 637 setOwner(owner.getShortName()); 638 } 639 640 /** 641 * Called on store load to initialize the Procedure internals after the creation/deserialization. 642 */ 643 protected void setSubmittedTime(long submittedTime) { 644 this.submittedTime = submittedTime; 645 } 646 647 // ========================================================================== 648 // runtime state - timeout related 649 // ========================================================================== 650 /** 651 * @param timeout timeout interval in msec 652 */ 653 protected void setTimeout(int timeout) { 654 this.timeout = timeout; 655 } 656 657 public boolean hasTimeout() { 658 return timeout != NO_TIMEOUT; 659 } 660 661 /** Returns the timeout in msec */ 662 public int getTimeout() { 663 return timeout; 664 } 665 666 /** 667 * Called on store load to initialize the Procedure internals after the creation/deserialization. 668 */ 669 protected void setLastUpdate(long lastUpdate) { 670 this.lastUpdate = lastUpdate; 671 } 672 673 /** 674 * Called by ProcedureExecutor after each time a procedure step is executed. 675 */ 676 protected void updateTimestamp() { 677 this.lastUpdate = EnvironmentEdgeManager.currentTime(); 678 } 679 680 public long getLastUpdate() { 681 return lastUpdate; 682 } 683 684 /** 685 * Timeout of the next timeout. Called by the ProcedureExecutor if the procedure has timeout set 686 * and the procedure is in the waiting queue. 687 * @return the timestamp of the next timeout. 688 */ 689 protected long getTimeoutTimestamp() { 690 return getLastUpdate() + getTimeout(); 691 } 692 693 // ========================================================================== 694 // runtime state 695 // ========================================================================== 696 /** Returns the time elapsed between the last update and the start time of the procedure. */ 697 public long elapsedTime() { 698 return getLastUpdate() - getSubmittedTime(); 699 } 700 701 /** Returns the serialized result if any, otherwise null */ 702 public byte[] getResult() { 703 return result; 704 } 705 706 /** 707 * The procedure may leave a "result" on completion. 708 * @param result the serialized result that will be passed to the client 709 */ 710 protected void setResult(byte[] result) { 711 this.result = result; 712 } 713 714 /** 715 * Will only be called when loading procedures from procedure store, where we need to record 716 * whether the procedure has already held a lock. Later we will call {@link #restoreLock(Object)} 717 * to actually acquire the lock. 718 */ 719 final void lockedWhenLoading() { 720 this.lockedWhenLoading = true; 721 } 722 723 /** 724 * Can only be called when restarting, before the procedure actually being executed, as after we 725 * actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset 726 * {@link #lockedWhenLoading} to false. 727 * <p/> 728 * Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in 729 * front of a queue. 730 */ 731 public boolean isLockedWhenLoading() { 732 return lockedWhenLoading; 733 } 734 735 // ============================================================================================== 736 // Runtime state, updated every operation by the ProcedureExecutor 737 // 738 // There is always 1 thread at the time operating on the state of the procedure. 739 // The ProcedureExecutor may check and set states, or some Procecedure may 740 // update its own state. but no concurrent updates. we use synchronized here 741 // just because the procedure can get scheduled on different executor threads on each step. 742 // ============================================================================================== 743 744 /** Returns true if the procedure is in a RUNNABLE state. */ 745 public synchronized boolean isRunnable() { 746 return state == ProcedureState.RUNNABLE; 747 } 748 749 public synchronized boolean isInitializing() { 750 return state == ProcedureState.INITIALIZING; 751 } 752 753 /** Returns true if the procedure has failed. It may or may not have rolled back. */ 754 public synchronized boolean isFailed() { 755 return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK; 756 } 757 758 /** Returns true if the procedure is finished successfully. */ 759 public synchronized boolean isSuccess() { 760 return state == ProcedureState.SUCCESS && !hasException(); 761 } 762 763 /** 764 * @return true if the procedure is finished. The Procedure may be completed successfully or 765 * rolledback. 766 */ 767 public synchronized boolean isFinished() { 768 return isSuccess() || state == ProcedureState.ROLLEDBACK; 769 } 770 771 /** Returns true if the procedure is waiting for a child to finish or for an external event. */ 772 public synchronized boolean isWaiting() { 773 switch (state) { 774 case WAITING: 775 case WAITING_TIMEOUT: 776 return true; 777 default: 778 break; 779 } 780 return false; 781 } 782 783 protected synchronized void setState(final ProcedureState state) { 784 this.state = state; 785 updateTimestamp(); 786 } 787 788 public synchronized ProcedureState getState() { 789 return state; 790 } 791 792 protected void setFailure(final String source, final Throwable cause) { 793 setFailure(new RemoteProcedureException(source, cause)); 794 } 795 796 protected synchronized void setFailure(final RemoteProcedureException exception) { 797 this.exception = exception; 798 if (!isFinished()) { 799 setState(ProcedureState.FAILED); 800 } 801 } 802 803 protected void setAbortFailure(final String source, final String msg) { 804 setFailure(source, new ProcedureAbortedException(msg)); 805 } 806 807 /** 808 * Called by the ProcedureExecutor when the timeout set by setTimeout() is expired. 809 * <p/> 810 * Another usage for this method is to implement retrying. A procedure can set the state to 811 * {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a 812 * {@link ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a 813 * call {@link #setTimeout(int)} method to set the timeout. And you should also override this 814 * method to wake up the procedure, and also return false to tell the ProcedureExecutor that the 815 * timeout event has been handled. 816 * @return true to let the framework handle the timeout as abort, false in case the procedure 817 * handled the timeout itself. 818 */ 819 protected synchronized boolean setTimeoutFailure(TEnvironment env) { 820 if (state == ProcedureState.WAITING_TIMEOUT) { 821 long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate; 822 setFailure("ProcedureExecutor", 823 new TimeoutIOException("Operation timed out after " + StringUtils.humanTimeDiff(timeDiff))); 824 return true; 825 } 826 return false; 827 } 828 829 public synchronized boolean hasException() { 830 return exception != null; 831 } 832 833 public synchronized RemoteProcedureException getException() { 834 return exception; 835 } 836 837 /** 838 * Called by the ProcedureExecutor on procedure-load to restore the latch state 839 */ 840 protected synchronized void setChildrenLatch(int numChildren) { 841 this.childrenLatch = numChildren; 842 if (LOG.isTraceEnabled()) { 843 LOG.trace("CHILD LATCH INCREMENT SET " + this.childrenLatch, new Throwable(this.toString())); 844 } 845 } 846 847 /** 848 * Called by the ProcedureExecutor on procedure-load to restore the latch state 849 */ 850 protected synchronized void incChildrenLatch() { 851 // TODO: can this be inferred from the stack? I think so... 852 this.childrenLatch++; 853 if (LOG.isTraceEnabled()) { 854 LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString())); 855 } 856 } 857 858 /** 859 * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. 860 */ 861 private synchronized boolean childrenCountDown() { 862 assert childrenLatch > 0 : this; 863 boolean b = --childrenLatch == 0; 864 if (LOG.isTraceEnabled()) { 865 LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString())); 866 } 867 return b; 868 } 869 870 /** 871 * Try to set this procedure into RUNNABLE state. Succeeds if all subprocedures/children are done. 872 * @return True if we were able to move procedure to RUNNABLE state. 873 */ 874 synchronized boolean tryRunnable() { 875 // Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT 876 if (getState() == ProcedureState.WAITING && childrenCountDown()) { 877 setState(ProcedureState.RUNNABLE); 878 return true; 879 } else { 880 return false; 881 } 882 } 883 884 protected synchronized boolean hasChildren() { 885 return childrenLatch > 0; 886 } 887 888 protected synchronized int getChildrenLatch() { 889 return childrenLatch; 890 } 891 892 /** 893 * Called by the RootProcedureState on procedure execution. Each procedure store its stack-index 894 * positions. 895 */ 896 protected synchronized void addStackIndex(final int index) { 897 if (stackIndexes == null) { 898 stackIndexes = new int[] { index }; 899 } else { 900 int count = stackIndexes.length; 901 stackIndexes = Arrays.copyOf(stackIndexes, count + 1); 902 stackIndexes[count] = index; 903 } 904 wasExecuted = true; 905 } 906 907 protected synchronized boolean removeStackIndex() { 908 if (stackIndexes != null && stackIndexes.length > 1) { 909 stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1); 910 return false; 911 } else { 912 stackIndexes = null; 913 return true; 914 } 915 } 916 917 /** 918 * Called on store load to initialize the Procedure internals after the creation/deserialization. 919 */ 920 protected synchronized void setStackIndexes(final List<Integer> stackIndexes) { 921 this.stackIndexes = new int[stackIndexes.size()]; 922 for (int i = 0; i < this.stackIndexes.length; ++i) { 923 this.stackIndexes[i] = stackIndexes.get(i); 924 } 925 // for backward compatible, where a procedure is serialized before we added the executed flag, 926 // the flag will be false so we need to set the wasExecuted flag here 927 this.wasExecuted = true; 928 } 929 930 protected synchronized void setExecuted() { 931 this.wasExecuted = true; 932 } 933 934 public synchronized boolean wasExecuted() { 935 return wasExecuted; 936 } 937 938 protected synchronized int[] getStackIndexes() { 939 return stackIndexes; 940 } 941 942 /** 943 * Return whether the procedure supports rollback. If the procedure does not support rollback, we 944 * can skip the rollback state management which could increase the performance. See HBASE-28210 945 * and HBASE-28212. 946 */ 947 protected boolean isRollbackSupported() { 948 return true; 949 } 950 951 // ========================================================================== 952 // Internal methods - called by the ProcedureExecutor 953 // ========================================================================== 954 955 /** 956 * Internal method called by the ProcedureExecutor that starts the user-level code execute(). 957 * @throws ProcedureSuspendedException This is used when procedure wants to halt processing and 958 * skip out without changing states or releasing any locks 959 * held. 960 */ 961 protected Procedure<TEnvironment>[] doExecute(TEnvironment env) 962 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 963 try { 964 updateTimestamp(); 965 if (bypass) { 966 LOG.info("{} bypassed, returning null to finish it", this); 967 return null; 968 } 969 return execute(env); 970 } finally { 971 updateTimestamp(); 972 } 973 } 974 975 /** 976 * Internal method called by the ProcedureExecutor that starts the user-level code rollback(). 977 */ 978 protected void doRollback(TEnvironment env) throws IOException, InterruptedException { 979 try { 980 updateTimestamp(); 981 if (bypass) { 982 LOG.info("{} bypassed, skipping rollback", this); 983 return; 984 } 985 rollback(env); 986 } finally { 987 updateTimestamp(); 988 } 989 } 990 991 final void restoreLock(TEnvironment env) { 992 if (!lockedWhenLoading) { 993 LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock.", this); 994 return; 995 } 996 997 if (isFinished()) { 998 LOG.debug("{} is already finished, skip acquiring lock.", this); 999 return; 1000 } 1001 1002 if (isBypass()) { 1003 LOG.debug("{} is already bypassed, skip acquiring lock.", this); 1004 return; 1005 } 1006 // this can happen if the parent stores the sub procedures but before it can 1007 // release its lock, the master restarts 1008 if (getState() == ProcedureState.WAITING && !holdLock(env)) { 1009 LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this); 1010 lockedWhenLoading = false; 1011 return; 1012 } 1013 LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this); 1014 LockState state = acquireLock(env); 1015 assert state == LockState.LOCK_ACQUIRED; 1016 } 1017 1018 /** 1019 * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock(). 1020 */ 1021 final LockState doAcquireLock(TEnvironment env, ProcedureStore store) { 1022 if (waitInitialized(env)) { 1023 return LockState.LOCK_EVENT_WAIT; 1024 } 1025 if (lockedWhenLoading) { 1026 // reset it so we will not consider it anymore 1027 lockedWhenLoading = false; 1028 locked = true; 1029 // Here we return without persist the locked state, as lockedWhenLoading is true means 1030 // that the locked field of the procedure stored in procedure store is true, so we do not need 1031 // to store it again. 1032 return LockState.LOCK_ACQUIRED; 1033 } 1034 LockState state = acquireLock(env); 1035 if (state == LockState.LOCK_ACQUIRED) { 1036 locked = true; 1037 // persist that we have held the lock. This must be done before we actually execute the 1038 // procedure, otherwise when restarting, we may consider the procedure does not have a lock, 1039 // but it may have already done some changes as we have already executed it, and if another 1040 // procedure gets the lock, then the semantic will be broken if the holdLock is true, as we do 1041 // not expect that another procedure can be executed in the middle. 1042 store.update(this); 1043 } 1044 return state; 1045 } 1046 1047 /** 1048 * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock(). 1049 */ 1050 final void doReleaseLock(TEnvironment env, ProcedureStore store) { 1051 locked = false; 1052 // persist that we have released the lock. This must be done before we actually release the 1053 // lock. Another procedure may take this lock immediately after we release the lock, and if we 1054 // crash before persist the information that we have already released the lock, then when 1055 // restarting there will be two procedures which both have the lock and cause problems. 1056 if (getState() != ProcedureState.ROLLEDBACK) { 1057 // If the state is ROLLEDBACK, it means that we have already deleted the procedure from 1058 // procedure store, so do not need to log the release operation any more. 1059 store.update(this); 1060 } 1061 releaseLock(env); 1062 } 1063 1064 protected final ProcedureSuspendedException suspend(int timeoutMillis, boolean jitter) 1065 throws ProcedureSuspendedException { 1066 if (jitter) { 1067 // 10% possible jitter 1068 double add = (double) timeoutMillis * ThreadLocalRandom.current().nextDouble(0.1); 1069 timeoutMillis += add; 1070 } 1071 setTimeout(timeoutMillis); 1072 setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); 1073 skipPersistence(); 1074 throw new ProcedureSuspendedException(); 1075 } 1076 1077 @Override 1078 public int compareTo(final Procedure<TEnvironment> other) { 1079 return Long.compare(getProcId(), other.getProcId()); 1080 } 1081 1082 // ========================================================================== 1083 // misc utils 1084 // ========================================================================== 1085 1086 /** 1087 * Get an hashcode for the specified Procedure ID 1088 * @return the hashcode for the specified procId 1089 */ 1090 public static long getProcIdHashCode(long procId) { 1091 long h = procId; 1092 h ^= h >> 16; 1093 h *= 0x85ebca6b; 1094 h ^= h >> 13; 1095 h *= 0xc2b2ae35; 1096 h ^= h >> 16; 1097 return h; 1098 } 1099 1100 /** 1101 * Helper to lookup the root Procedure ID given a specified procedure. 1102 */ 1103 protected static <T> Long getRootProcedureId(Map<Long, Procedure<T>> procedures, 1104 Procedure<T> proc) { 1105 while (proc.hasParent()) { 1106 proc = procedures.get(proc.getParentProcId()); 1107 if (proc == null) { 1108 return null; 1109 } 1110 } 1111 return proc.getProcId(); 1112 } 1113 1114 /** 1115 * @param a the first procedure to be compared. 1116 * @param b the second procedure to be compared. 1117 * @return true if the two procedures have the same parent 1118 */ 1119 public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) { 1120 return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId()); 1121 } 1122}