001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import edu.umd.cs.findbugs.annotations.Nullable; 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.util.ArrayDeque; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Comparator; 028import java.util.Deque; 029import java.util.HashSet; 030import java.util.List; 031import java.util.PriorityQueue; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.CopyOnWriteArrayList; 035import java.util.concurrent.ExecutorService; 036import java.util.concurrent.Executors; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicBoolean; 042import java.util.concurrent.atomic.AtomicInteger; 043import java.util.concurrent.atomic.AtomicLong; 044import java.util.stream.Collectors; 045import java.util.stream.Stream; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 049import org.apache.hadoop.hbase.log.HBaseMarkers; 050import org.apache.hadoop.hbase.procedure2.Procedure.LockState; 051import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 052import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 053import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 054import org.apache.hadoop.hbase.procedure2.trace.ProcedureSpanBuilder; 055import org.apache.hadoop.hbase.procedure2.util.StringUtils; 056import org.apache.hadoop.hbase.security.User; 057import org.apache.hadoop.hbase.trace.TraceUtil; 058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 059import org.apache.hadoop.hbase.util.IdLock; 060import org.apache.hadoop.hbase.util.NonceKey; 061import org.apache.hadoop.hbase.util.Threads; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 067import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 068 069import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 070 071/** 072 * Thread Pool that executes the submitted procedures. The executor has a ProcedureStore associated. 073 * Each operation is logged and on restart the pending procedures are resumed. Unless the Procedure 074 * code throws an error (e.g. invalid user input) the procedure will complete (at some point in 075 * time), On restart the pending procedures are resumed and the once failed will be rolledback. The 076 * user can add procedures to the executor via submitProcedure(proc) check for the finished state 077 * via isFinished(procId) and get the result via getResult(procId) 078 */ 079@InterfaceAudience.Private 080public class ProcedureExecutor<TEnvironment> { 081 private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); 082 083 public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; 084 private static final boolean DEFAULT_CHECK_OWNER_SET = false; 085 086 public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = 087 "hbase.procedure.worker.keep.alive.time.msec"; 088 private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); 089 090 public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; 091 static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min 092 093 public static final String EVICT_ACKED_TTL_CONF_KEY = "hbase.procedure.cleaner.acked.evict.ttl"; 094 static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min 095 096 /** 097 * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to break PE 098 * having it fail at various junctures. When non-null, testing is set to an instance of the below 099 * internal {@link Testing} class with flags set for the particular test. 100 */ 101 volatile Testing testing = null; 102 103 /** 104 * Class with parameters describing how to fail/die when in testing-context. 105 */ 106 public static class Testing { 107 protected volatile boolean killIfHasParent = true; 108 protected volatile boolean killIfSuspended = false; 109 110 /** 111 * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is 112 * persisting all the state it needs to recover after a crash. 113 */ 114 protected volatile boolean killBeforeStoreUpdate = false; 115 protected volatile boolean toggleKillBeforeStoreUpdate = false; 116 117 /** 118 * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978 119 * is about a case where memory-state was being set after store to WAL where a crash could cause 120 * us to get stuck. This flag allows killing at what was a vulnerable time. 121 */ 122 protected volatile boolean killAfterStoreUpdate = false; 123 protected volatile boolean toggleKillAfterStoreUpdate = false; 124 125 protected volatile boolean killBeforeStoreUpdateInRollback = false; 126 protected volatile boolean toggleKillBeforeStoreUpdateInRollback = false; 127 128 protected boolean shouldKillBeforeStoreUpdate() { 129 final boolean kill = this.killBeforeStoreUpdate; 130 if (this.toggleKillBeforeStoreUpdate) { 131 this.killBeforeStoreUpdate = !kill; 132 LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); 133 } 134 return kill; 135 } 136 137 protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) { 138 if (isSuspended && !killIfSuspended) { 139 return false; 140 } 141 if (hasParent && !killIfHasParent) { 142 return false; 143 } 144 return shouldKillBeforeStoreUpdate(); 145 } 146 147 protected boolean shouldKillAfterStoreUpdate() { 148 final boolean kill = this.killAfterStoreUpdate; 149 if (this.toggleKillAfterStoreUpdate) { 150 this.killAfterStoreUpdate = !kill; 151 LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate); 152 } 153 return kill; 154 } 155 156 protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { 157 return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); 158 } 159 160 protected boolean shouldKillBeforeStoreUpdateInRollback() { 161 final boolean kill = this.killBeforeStoreUpdateInRollback; 162 if (this.toggleKillBeforeStoreUpdateInRollback) { 163 this.killBeforeStoreUpdateInRollback = !kill; 164 LOG.warn("Toggle KILL before store update in rollback to: " 165 + this.killBeforeStoreUpdateInRollback); 166 } 167 return kill; 168 } 169 } 170 171 public interface ProcedureExecutorListener { 172 void procedureLoaded(long procId); 173 174 void procedureAdded(long procId); 175 176 void procedureFinished(long procId); 177 } 178 179 /** 180 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. Once a 181 * Root-Procedure completes (success or failure), the result will be added to this map. The user 182 * of ProcedureExecutor should call getResult(procId) to get the result. 183 */ 184 private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = 185 new ConcurrentHashMap<>(); 186 187 /** 188 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. 189 * The RootProcedureState contains the execution stack of the Root-Procedure, It is added to the 190 * map by submitProcedure() and removed on procedure completion. 191 */ 192 private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = 193 new ConcurrentHashMap<>(); 194 195 /** 196 * Helper map to lookup the live procedures by ID. This map contains every procedure. 197 * root-procedures and subprocedures. 198 */ 199 private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = 200 new ConcurrentHashMap<>(); 201 202 /** 203 * Helper map to lookup whether the procedure already issued from the same client. This map 204 * contains every root procedure. 205 */ 206 private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); 207 208 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = 209 new CopyOnWriteArrayList<>(); 210 211 private Configuration conf; 212 213 /** 214 * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing 215 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 216 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 217 */ 218 private ThreadGroup threadGroup; 219 220 /** 221 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 222 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 223 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 224 */ 225 private CopyOnWriteArrayList<WorkerThread> workerThreads; 226 227 /** 228 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 229 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 230 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 231 */ 232 private TimeoutExecutorThread<TEnvironment> timeoutExecutor; 233 234 /** 235 * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if 236 * there is no worker to assign meta, it will new worker thread for it, so it is very important. 237 * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore and 238 * so on, some tasks may execute for a long time so will block other tasks like WorkerMonitor, so 239 * use a dedicated thread for executing WorkerMonitor. 240 */ 241 private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor; 242 243 private ExecutorService forceUpdateExecutor; 244 245 // A thread pool for executing some asynchronous tasks for procedures, you can find references to 246 // getAsyncTaskExecutor to see the usage 247 private ExecutorService asyncTaskExecutor; 248 249 private int corePoolSize; 250 private int maxPoolSize; 251 252 private volatile long keepAliveTime; 253 254 /** 255 * Scheduler/Queue that contains runnable procedures. 256 */ 257 private final ProcedureScheduler scheduler; 258 259 private final AtomicLong lastProcId = new AtomicLong(-1); 260 private final AtomicLong workerId = new AtomicLong(0); 261 private final AtomicInteger activeExecutorCount = new AtomicInteger(0); 262 private final AtomicBoolean running = new AtomicBoolean(false); 263 private final TEnvironment environment; 264 private final ProcedureStore store; 265 266 private final boolean checkOwnerSet; 267 268 // To prevent concurrent execution of the same procedure. 269 // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the 270 // procedure is woken up before we finish the suspend which causes the same procedures to be 271 // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also 272 // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent 273 // execution of the same procedure. 274 private final IdLock procExecutionLock = new IdLock(); 275 276 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 277 final ProcedureStore store) { 278 this(conf, environment, store, new SimpleProcedureScheduler()); 279 } 280 281 private boolean isRootFinished(Procedure<?> proc) { 282 Procedure<?> rootProc = procedures.get(proc.getRootProcId()); 283 return rootProc == null || rootProc.isFinished(); 284 } 285 286 private void forceUpdateProcedure(long procId) throws IOException { 287 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); 288 try { 289 Procedure<TEnvironment> proc = procedures.get(procId); 290 if (proc != null) { 291 if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) { 292 LOG.debug("Procedure {} has already been finished and parent is succeeded," 293 + " skip force updating", proc); 294 return; 295 } 296 } else { 297 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 298 if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) { 299 LOG.debug("No pending procedure with id = {}, skip force updating.", procId); 300 return; 301 } 302 long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); 303 long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); 304 if (retainer.isExpired(EnvironmentEdgeManager.currentTime(), evictTtl, evictAckTtl)) { 305 LOG.debug("Procedure {} has already been finished and expired, skip force updating", 306 procId); 307 return; 308 } 309 proc = retainer.getProcedure(); 310 } 311 LOG.debug("Force update procedure {}", proc); 312 store.update(proc); 313 } finally { 314 procExecutionLock.releaseLockEntry(lockEntry); 315 } 316 } 317 318 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 319 final ProcedureStore store, final ProcedureScheduler scheduler) { 320 this.environment = environment; 321 this.scheduler = scheduler; 322 this.store = store; 323 this.conf = conf; 324 this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); 325 refreshConfiguration(conf); 326 } 327 328 private void load(final boolean abortOnCorruption) throws IOException { 329 Preconditions.checkArgument(completed.isEmpty(), "completed not empty: %s", completed); 330 Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty: %s", 331 rollbackStack); 332 Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty: %s", procedures); 333 Preconditions.checkArgument(scheduler.size() == 0, "scheduler queue not empty: %s", scheduler); 334 335 store.load(new ProcedureStore.ProcedureLoader() { 336 @Override 337 public void setMaxProcId(long maxProcId) { 338 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; 339 lastProcId.set(maxProcId); 340 } 341 342 @Override 343 public void load(ProcedureIterator procIter) throws IOException { 344 loadProcedures(procIter); 345 } 346 347 @Override 348 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 349 int corruptedCount = 0; 350 while (procIter.hasNext()) { 351 Procedure<?> proc = procIter.next(); 352 LOG.error("Corrupt " + proc); 353 corruptedCount++; 354 } 355 if (abortOnCorruption && corruptedCount > 0) { 356 throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); 357 } 358 } 359 }); 360 } 361 362 private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { 363 proc.restoreLock(getEnvironment()); 364 restored.add(proc.getProcId()); 365 } 366 367 private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { 368 while (!stack.isEmpty()) { 369 restoreLock(stack.pop(), restored); 370 } 371 } 372 373 // Restore the locks for all the procedures. 374 // Notice that we need to restore the locks starting from the root proc, otherwise there will be 375 // problem that a sub procedure may hold the exclusive lock first and then we are stuck when 376 // calling the acquireLock method for the parent procedure. 377 // The algorithm is straight-forward: 378 // 1. Use a set to record the procedures which locks have already been restored. 379 // 2. Use a stack to store the hierarchy of the procedures 380 // 3. For all the procedure, we will first try to find its parent and push it into the stack, 381 // unless 382 // a. We have no parent, i.e, we are the root procedure 383 // b. The lock has already been restored(by checking the set introduced in #1) 384 // then we start to pop the stack and call acquireLock for each procedure. 385 // Notice that this should be done for all procedures, not only the ones in runnableList. 386 private void restoreLocks() { 387 Set<Long> restored = new HashSet<>(); 388 Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); 389 procedures.values().forEach(proc -> { 390 for (;;) { 391 if (restored.contains(proc.getProcId())) { 392 restoreLocks(stack, restored); 393 return; 394 } 395 if (!proc.hasParent()) { 396 restoreLock(proc, restored); 397 restoreLocks(stack, restored); 398 return; 399 } 400 stack.push(proc); 401 proc = procedures.get(proc.getParentProcId()); 402 } 403 }); 404 } 405 406 private void initializeStacks(ProcedureIterator procIter, 407 List<Procedure<TEnvironment>> runnableList, List<Procedure<TEnvironment>> failedList, 408 List<Procedure<TEnvironment>> waitingList, List<Procedure<TEnvironment>> waitingTimeoutList) 409 throws IOException { 410 procIter.reset(); 411 while (procIter.hasNext()) { 412 if (procIter.isNextFinished()) { 413 procIter.skipNext(); 414 continue; 415 } 416 417 @SuppressWarnings("unchecked") 418 Procedure<TEnvironment> proc = procIter.next(); 419 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; 420 LOG.debug("Loading {}", proc); 421 Long rootProcId = getRootProcedureId(proc); 422 // The orphan procedures will be passed to handleCorrupted, so add an assert here 423 assert rootProcId != null; 424 425 if (proc.hasParent()) { 426 Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); 427 if (parent != null && !proc.isFinished()) { 428 parent.incChildrenLatch(); 429 } 430 } 431 432 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 433 procStack.loadStack(proc); 434 435 proc.setRootProcId(rootProcId); 436 switch (proc.getState()) { 437 case RUNNABLE: 438 runnableList.add(proc); 439 break; 440 case WAITING: 441 waitingList.add(proc); 442 break; 443 case WAITING_TIMEOUT: 444 waitingTimeoutList.add(proc); 445 break; 446 case FAILED: 447 failedList.add(proc); 448 break; 449 case ROLLEDBACK: 450 case INITIALIZING: 451 String msg = "Unexpected " + proc.getState() + " state for " + proc; 452 LOG.error(msg); 453 throw new UnsupportedOperationException(msg); 454 default: 455 break; 456 } 457 } 458 rollbackStack.forEach((rootProcId, procStack) -> { 459 if (procStack.getSubproceduresStack() != null) { 460 // if we have already record some stack ids, it means we support rollback 461 procStack.setRollbackSupported(true); 462 } else { 463 // otherwise, test the root procedure to see if we support rollback 464 procStack.setRollbackSupported(procedures.get(rootProcId).isRollbackSupported()); 465 } 466 }); 467 } 468 469 private void processWaitingProcedures(List<Procedure<TEnvironment>> waitingList, 470 List<Procedure<TEnvironment>> runnableList) { 471 waitingList.forEach(proc -> { 472 if (!proc.hasChildren()) { 473 // Normally, WAITING procedures should be waken by its children. But, there is a case that, 474 // all the children are successful and before they can wake up their parent procedure, the 475 // master was killed. So, during recovering the procedures from ProcedureWal, its children 476 // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING 477 // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a 478 // exception will throw: 479 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 480 // "NOT RUNNABLE! " + procedure.toString()); 481 proc.setState(ProcedureState.RUNNABLE); 482 runnableList.add(proc); 483 } else { 484 proc.afterReplay(getEnvironment()); 485 } 486 }); 487 } 488 489 private void processWaitingTimeoutProcedures(List<Procedure<TEnvironment>> waitingTimeoutList) { 490 waitingTimeoutList.forEach(proc -> { 491 proc.afterReplay(getEnvironment()); 492 timeoutExecutor.add(proc); 493 }); 494 } 495 496 private void pushProceduresAfterLoad(List<Procedure<TEnvironment>> runnableList, 497 List<Procedure<TEnvironment>> failedList) { 498 failedList.forEach(scheduler::addBack); 499 // Put the procedures which have been executed first 500 // For table procedures, to prevent concurrent modifications, we only allow one procedure to run 501 // for a single table at the same time, this is done via inserting a waiting queue before 502 // actually add the procedure to run queue. So when loading here, we should add the procedures 503 // which have been executed first, otherwise another procedure which was in the waiting queue 504 // before restarting may be added to run queue first and still cause concurrent modifications. 505 // See HBASE-28263 for the reason why we need this 506 runnableList.sort((p1, p2) -> { 507 if (p1.wasExecuted()) { 508 if (p2.wasExecuted()) { 509 return Long.compare(p1.getProcId(), p2.getProcId()); 510 } else { 511 return -1; 512 } 513 } else { 514 if (p2.wasExecuted()) { 515 return 1; 516 } else { 517 return Long.compare(p1.getProcId(), p2.getProcId()); 518 } 519 } 520 }); 521 runnableList.forEach(p -> { 522 p.afterReplay(getEnvironment()); 523 if (!p.hasParent()) { 524 sendProcedureLoadedNotification(p.getProcId()); 525 } 526 scheduler.addBack(p); 527 }); 528 } 529 530 private void loadProcedures(ProcedureIterator procIter) throws IOException { 531 // 1. Build the rollback stack 532 int runnableCount = 0; 533 int failedCount = 0; 534 int waitingCount = 0; 535 int waitingTimeoutCount = 0; 536 while (procIter.hasNext()) { 537 boolean finished = procIter.isNextFinished(); 538 @SuppressWarnings("unchecked") 539 Procedure<TEnvironment> proc = procIter.next(); 540 NonceKey nonceKey = proc.getNonceKey(); 541 long procId = proc.getProcId(); 542 543 if (finished) { 544 completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); 545 LOG.debug("Completed {}", proc); 546 } else { 547 if (!proc.hasParent()) { 548 assert !proc.isFinished() : "unexpected finished procedure"; 549 rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); 550 } 551 552 // add the procedure to the map 553 proc.beforeReplay(getEnvironment()); 554 procedures.put(proc.getProcId(), proc); 555 switch (proc.getState()) { 556 case RUNNABLE: 557 runnableCount++; 558 break; 559 case FAILED: 560 failedCount++; 561 break; 562 case WAITING: 563 waitingCount++; 564 break; 565 case WAITING_TIMEOUT: 566 waitingTimeoutCount++; 567 break; 568 default: 569 break; 570 } 571 } 572 573 if (nonceKey != null) { 574 nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map 575 } 576 } 577 578 // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will 579 // push it into the ProcedureScheduler directly to execute the rollback. But this does not work 580 // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove 581 // the queue from runQueue in scheduler, and then when a procedure which has lock access, for 582 // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler, 583 // we will add the queue back to let the workers poll from it. The assumption here is that, the 584 // procedure which has the xlock should have been polled out already, so when loading we can not 585 // add the procedure to scheduler first and then call acquireLock, since the procedure is still 586 // in the queue, and since we will remove the queue from runQueue, then no one can poll it out, 587 // then there is a dead lock 588 List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); 589 List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); 590 List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); 591 List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); 592 593 initializeStacks(procIter, runnableList, failedList, waitingList, waitingTimeoutList); 594 595 // 3. Check the waiting procedures to see if some of them can be added to runnable. 596 processWaitingProcedures(waitingList, runnableList); 597 598 // 4. restore locks 599 restoreLocks(); 600 601 // 5. Push the procedures to the timeout executor 602 processWaitingTimeoutProcedures(waitingTimeoutList); 603 604 // 6. Push the procedure to the scheduler 605 pushProceduresAfterLoad(runnableList, failedList); 606 // After all procedures put into the queue, signal the worker threads. 607 // Otherwise, there is a race condition. See HBASE-21364. 608 scheduler.signalAll(); 609 } 610 611 /** 612 * Initialize the procedure executor, but do not start workers. We will start them later. 613 * <p/> 614 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 615 * ensure a single executor, and start the procedure replay to resume and recover the previous 616 * pending and in-progress procedures. 617 * @param numThreads number of threads available for procedure execution. 618 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 619 * is found on replay. otherwise false. 620 */ 621 public void init(int numThreads, boolean abortOnCorruption) throws IOException { 622 // We have numThreads executor + one timer thread used for timing out 623 // procedures and triggering periodic procedures. 624 this.corePoolSize = numThreads; 625 this.maxPoolSize = 10 * numThreads; 626 LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", 627 corePoolSize, maxPoolSize); 628 629 this.threadGroup = new ThreadGroup("PEWorkerGroup"); 630 this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); 631 this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); 632 ThreadFactory backingThreadFactory = new ThreadFactory() { 633 634 @Override 635 public Thread newThread(Runnable r) { 636 return new Thread(threadGroup, r); 637 } 638 }; 639 int size = Math.max(2, Runtime.getRuntime().availableProcessors()); 640 ThreadPoolExecutor executor = 641 new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), 642 new ThreadFactoryBuilder().setDaemon(true) 643 .setNameFormat(getClass().getSimpleName() + "-Async-Task-Executor-%d") 644 .setThreadFactory(backingThreadFactory).build()); 645 executor.allowCoreThreadTimeOut(true); 646 this.asyncTaskExecutor = executor; 647 forceUpdateExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) 648 .setNameFormat("Force-Update-PEWorker-%d").setThreadFactory(backingThreadFactory).build()); 649 store.registerListener(new ProcedureStoreListener() { 650 651 @Override 652 public void forceUpdate(long[] procIds) { 653 Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { 654 try { 655 forceUpdateProcedure(procId); 656 } catch (IOException e) { 657 LOG.warn("Failed to force update procedure with pid={}", procId); 658 } 659 })); 660 } 661 }); 662 663 // Create the workers 664 workerId.set(0); 665 workerThreads = new CopyOnWriteArrayList<>(); 666 for (int i = 0; i < corePoolSize; ++i) { 667 workerThreads.add(new WorkerThread(threadGroup)); 668 } 669 670 long st, et; 671 672 // Acquire the store lease. 673 st = System.nanoTime(); 674 store.recoverLease(); 675 et = System.nanoTime(); 676 LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), 677 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 678 679 // start the procedure scheduler 680 scheduler.start(); 681 682 // TODO: Split in two steps. 683 // TODO: Handle corrupted procedures (currently just a warn) 684 // The first one will make sure that we have the latest id, 685 // so we can start the threads and accept new procedures. 686 // The second step will do the actual load of old procedures. 687 st = System.nanoTime(); 688 load(abortOnCorruption); 689 et = System.nanoTime(); 690 LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), 691 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 692 } 693 694 /** 695 * Start the workers. 696 */ 697 public void startWorkers() throws IOException { 698 if (!running.compareAndSet(false, true)) { 699 LOG.warn("Already running"); 700 return; 701 } 702 // Start the executors. Here we must have the lastProcId set. 703 LOG.trace("Start workers {}", workerThreads.size()); 704 timeoutExecutor.start(); 705 workerMonitorExecutor.start(); 706 for (WorkerThread worker : workerThreads) { 707 worker.start(); 708 } 709 710 // Internal chores 711 workerMonitorExecutor.add(new WorkerMonitor()); 712 713 // Add completed cleaner chore 714 addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, 715 nonceKeysToProcIdsMap)); 716 } 717 718 public void stop() { 719 // it is possible that we fail in init, while loading procedures, so we will not set running to 720 // true but we should have already started the ProcedureScheduler, and also the two 721 // ExecutorServices, so here we do not check running state, just stop them 722 running.set(false); 723 LOG.info("Stopping"); 724 scheduler.stop(); 725 timeoutExecutor.sendStopSignal(); 726 workerMonitorExecutor.sendStopSignal(); 727 forceUpdateExecutor.shutdown(); 728 asyncTaskExecutor.shutdown(); 729 } 730 731 public void join() { 732 assert !isRunning() : "expected not running"; 733 734 // stop the timeout executor 735 timeoutExecutor.awaitTermination(); 736 // stop the work monitor executor 737 workerMonitorExecutor.awaitTermination(); 738 739 // stop the worker threads 740 for (WorkerThread worker : workerThreads) { 741 worker.awaitTermination(); 742 } 743 try { 744 if (!forceUpdateExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 745 LOG.warn("There are still pending tasks in forceUpdateExecutor"); 746 } 747 } catch (InterruptedException e) { 748 LOG.warn("interrupted while waiting for forceUpdateExecutor termination", e); 749 Thread.currentThread().interrupt(); 750 } 751 try { 752 if (!asyncTaskExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 753 LOG.warn("There are still pending tasks in asyncTaskExecutor"); 754 } 755 } catch (InterruptedException e) { 756 LOG.warn("interrupted while waiting for asyncTaskExecutor termination", e); 757 Thread.currentThread().interrupt(); 758 } 759 760 // log the still active threads, ThreadGroup.destroy is deprecated in JDK17 and it is not 761 // necessary for us to must destroy it here, so we just do a check and log 762 if (threadGroup.activeCount() > 0) { 763 LOG.error("There are still active thread in group {}, see STDOUT", threadGroup); 764 threadGroup.list(); 765 } 766 767 // reset the in-memory state for testing 768 completed.clear(); 769 rollbackStack.clear(); 770 procedures.clear(); 771 nonceKeysToProcIdsMap.clear(); 772 scheduler.clear(); 773 lastProcId.set(-1); 774 } 775 776 public void refreshConfiguration(final Configuration conf) { 777 this.conf = conf; 778 setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, DEFAULT_WORKER_KEEP_ALIVE_TIME), 779 TimeUnit.MILLISECONDS); 780 } 781 782 // ========================================================================== 783 // Accessors 784 // ========================================================================== 785 public boolean isRunning() { 786 return running.get(); 787 } 788 789 /** Returns the current number of worker threads. */ 790 public int getWorkerThreadCount() { 791 return workerThreads.size(); 792 } 793 794 /** Returns the core pool size settings. */ 795 public int getCorePoolSize() { 796 return corePoolSize; 797 } 798 799 public int getActiveExecutorCount() { 800 return activeExecutorCount.get(); 801 } 802 803 public TEnvironment getEnvironment() { 804 return this.environment; 805 } 806 807 public ProcedureStore getStore() { 808 return this.store; 809 } 810 811 ProcedureScheduler getScheduler() { 812 return scheduler; 813 } 814 815 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { 816 this.keepAliveTime = timeUnit.toMillis(keepAliveTime); 817 this.scheduler.signalAll(); 818 } 819 820 public long getKeepAliveTime(final TimeUnit timeUnit) { 821 return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); 822 } 823 824 // ========================================================================== 825 // Submit/Remove Chores 826 // ========================================================================== 827 828 /** 829 * Add a chore procedure to the executor 830 * @param chore the chore to add 831 */ 832 public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) { 833 if (chore == null) { 834 return; 835 } 836 chore.setState(ProcedureState.WAITING_TIMEOUT); 837 timeoutExecutor.add(chore); 838 } 839 840 /** 841 * Remove a chore procedure from the executor 842 * @param chore the chore to remove 843 * @return whether the chore is removed, or it will be removed later 844 */ 845 public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) { 846 if (chore == null) { 847 return true; 848 } 849 chore.setState(ProcedureState.SUCCESS); 850 return timeoutExecutor.remove(chore); 851 } 852 853 // ========================================================================== 854 // Nonce Procedure helpers 855 // ========================================================================== 856 /** 857 * Create a NonceKey from the specified nonceGroup and nonce. 858 * @param nonceGroup the group to use for the {@link NonceKey} 859 * @param nonce the nonce to use in the {@link NonceKey} 860 * @return the generated NonceKey 861 */ 862 public NonceKey createNonceKey(final long nonceGroup, final long nonce) { 863 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); 864 } 865 866 /** 867 * Register a nonce for a procedure that is going to be submitted. A procId will be reserved and 868 * on submitProcedure(), the procedure with the specified nonce will take the reserved ProcId. If 869 * someone already reserved the nonce, this method will return the procId reserved, otherwise an 870 * invalid procId will be returned. and the caller should procede and submit the procedure. 871 * @param nonceKey A unique identifier for this operation from the client or process. 872 * @return the procId associated with the nonce, if any otherwise an invalid procId. 873 */ 874 public long registerNonce(final NonceKey nonceKey) { 875 if (nonceKey == null) { 876 return -1; 877 } 878 879 // check if we have already a Reserved ID for the nonce 880 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); 881 if (oldProcId == null) { 882 // reserve a new Procedure ID, this will be associated with the nonce 883 // and the procedure submitted with the specified nonce will use this ID. 884 final long newProcId = nextProcId(); 885 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); 886 if (oldProcId == null) { 887 return -1; 888 } 889 } 890 891 // we found a registered nonce, but the procedure may not have been submitted yet. 892 // since the client expect the procedure to be submitted, spin here until it is. 893 final boolean traceEnabled = LOG.isTraceEnabled(); 894 while ( 895 isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) 896 && nonceKeysToProcIdsMap.containsKey(nonceKey) 897 ) { 898 if (traceEnabled) { 899 LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted"); 900 } 901 Threads.sleep(100); 902 } 903 return oldProcId.longValue(); 904 } 905 906 /** 907 * Remove the NonceKey if the procedure was not submitted to the executor. 908 * @param nonceKey A unique identifier for this operation from the client or process. 909 */ 910 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { 911 if (nonceKey == null) { 912 return; 913 } 914 915 final Long procId = nonceKeysToProcIdsMap.get(nonceKey); 916 if (procId == null) { 917 return; 918 } 919 920 // if the procedure was not submitted, remove the nonce 921 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { 922 nonceKeysToProcIdsMap.remove(nonceKey); 923 } 924 } 925 926 /** 927 * If the failure failed before submitting it, we may want to give back the same error to the 928 * requests with the same nonceKey. 929 * @param nonceKey A unique identifier for this operation from the client or process 930 * @param procName name of the procedure, used to inform the user 931 * @param procOwner name of the owner of the procedure, used to inform the user 932 * @param exception the failure to report to the user 933 */ 934 public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, 935 IOException exception) { 936 if (nonceKey == null) { 937 return; 938 } 939 940 Long procId = nonceKeysToProcIdsMap.get(nonceKey); 941 if (procId == null || completed.containsKey(procId)) { 942 return; 943 } 944 945 completed.computeIfAbsent(procId, (key) -> { 946 Procedure<TEnvironment> proc = 947 new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); 948 949 return new CompletedProcedureRetainer<>(proc); 950 }); 951 } 952 953 // ========================================================================== 954 // Submit/Abort Procedure 955 // ========================================================================== 956 /** 957 * Add a new root-procedure to the executor. 958 * @param proc the new procedure to execute. 959 * @return the procedure id, that can be used to monitor the operation 960 */ 961 public long submitProcedure(Procedure<TEnvironment> proc) { 962 return submitProcedure(proc, null); 963 } 964 965 /** 966 * Bypass a procedure. If the procedure is set to bypass, all the logic in execute/rollback will 967 * be ignored and it will return success, whatever. It is used to recover buggy stuck procedures, 968 * releasing the lock resources and letting other procedures run. Bypassing one procedure (and its 969 * ancestors will be bypassed automatically) may leave the cluster in a middle state, e.g. region 970 * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, the 971 * operators may have to do some clean up on hdfs or schedule some assign procedures to let region 972 * online. DO AT YOUR OWN RISK. 973 * <p> 974 * A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING, 975 * WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is 976 * executing it 3. No child procedure has been submitted 977 * <p> 978 * If all the requirements are meet, the procedure and its ancestors will be bypassed and 979 * persisted to WAL. 980 * <p> 981 * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What 982 * about WAITING_TIMEOUT? 983 * @param pids the procedure id 984 * @param lockWait time to wait lock 985 * @param force if force set to true, we will bypass the procedure even if it is executing. 986 * This is for procedures which can't break out during executing(due to bug, 987 * mostly) In this case, bypassing the procedure is not enough, since it is 988 * already stuck there. We need to restart the master after bypassing, and 989 * letting the problematic procedure to execute wth bypass=true, so in that 990 * condition, the procedure can be successfully bypassed. 991 * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! 992 * @return true if bypass success 993 * @throws IOException IOException 994 */ 995 public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, 996 boolean recursive) throws IOException { 997 List<Boolean> result = new ArrayList<Boolean>(pids.size()); 998 for (long pid : pids) { 999 result.add(bypassProcedure(pid, lockWait, force, recursive)); 1000 } 1001 return result; 1002 } 1003 1004 boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) 1005 throws IOException { 1006 Preconditions.checkArgument(lockWait > 0, "lockWait should be positive"); 1007 final Procedure<TEnvironment> procedure = getProcedure(pid); 1008 if (procedure == null) { 1009 LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); 1010 return false; 1011 } 1012 1013 LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", procedure, lockWait, 1014 override, recursive); 1015 IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); 1016 if (lockEntry == null && !override) { 1017 LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", lockWait, 1018 procedure, override); 1019 return false; 1020 } else if (lockEntry == null) { 1021 LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", lockWait, 1022 procedure, override); 1023 } 1024 try { 1025 // check whether the procedure is already finished 1026 if (procedure.isFinished()) { 1027 LOG.debug("{} is already finished, skipping bypass", procedure); 1028 return false; 1029 } 1030 1031 if (procedure.hasChildren()) { 1032 if (recursive) { 1033 // EXPENSIVE. Checks each live procedure of which there could be many!!! 1034 // Is there another way to get children of a procedure? 1035 LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); 1036 this.procedures.forEachValue(1 /* Single-threaded */, 1037 // Transformer 1038 v -> v.getParentProcId() == procedure.getProcId() ? v : null, 1039 // Consumer 1040 v -> { 1041 try { 1042 bypassProcedure(v.getProcId(), lockWait, override, recursive); 1043 } catch (IOException e) { 1044 LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); 1045 } 1046 }); 1047 } else { 1048 LOG.debug("{} has children, skipping bypass", procedure); 1049 return false; 1050 } 1051 } 1052 1053 // If the procedure has no parent or no child, we are safe to bypass it in whatever state 1054 if ( 1055 procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE 1056 && procedure.getState() != ProcedureState.WAITING 1057 && procedure.getState() != ProcedureState.WAITING_TIMEOUT 1058 ) { 1059 LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " 1060 + "(with no parent), {}", procedure); 1061 // Question: how is the bypass done here? 1062 return false; 1063 } 1064 1065 // Now, the procedure is not finished, and no one can execute it since we take the lock now 1066 // And we can be sure that its ancestor is not running too, since their child has not 1067 // finished yet 1068 Procedure<TEnvironment> current = procedure; 1069 while (current != null) { 1070 LOG.debug("Bypassing {}", current); 1071 current.bypass(getEnvironment()); 1072 store.update(current); 1073 long parentID = current.getParentProcId(); 1074 current = getProcedure(parentID); 1075 } 1076 1077 // wake up waiting procedure, already checked there is no child 1078 if (procedure.getState() == ProcedureState.WAITING) { 1079 procedure.setState(ProcedureState.RUNNABLE); 1080 store.update(procedure); 1081 } 1082 1083 // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. 1084 // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE 1085 if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1086 LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); 1087 if (timeoutExecutor.remove(procedure)) { 1088 LOG.debug("removed procedure {} from timeoutExecutor", procedure); 1089 timeoutExecutor.executeTimedoutProcedure(procedure); 1090 } 1091 } else if (lockEntry != null) { 1092 scheduler.addFront(procedure); 1093 LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); 1094 } else { 1095 // If we don't have the lock, we can't re-submit the queue, 1096 // since it is already executing. To get rid of the stuck situation, we 1097 // need to restart the master. With the procedure set to bypass, the procedureExecutor 1098 // will bypass it and won't get stuck again. 1099 LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " 1100 + "skipping add to queue", procedure); 1101 } 1102 return true; 1103 1104 } finally { 1105 if (lockEntry != null) { 1106 procExecutionLock.releaseLockEntry(lockEntry); 1107 } 1108 } 1109 } 1110 1111 /** 1112 * Add a new root-procedure to the executor. 1113 * @param proc the new procedure to execute. 1114 * @param nonceKey the registered unique identifier for this operation from the client or process. 1115 * @return the procedure id, that can be used to monitor the operation 1116 */ 1117 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", 1118 justification = "FindBugs is blind to the check-for-null") 1119 public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { 1120 Preconditions.checkArgument(lastProcId.get() >= 0); 1121 1122 prepareProcedure(proc); 1123 1124 final Long currentProcId; 1125 if (nonceKey != null) { 1126 currentProcId = nonceKeysToProcIdsMap.get(nonceKey); 1127 Preconditions.checkArgument(currentProcId != null, 1128 "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); 1129 } else { 1130 currentProcId = nextProcId(); 1131 } 1132 1133 // Initialize the procedure 1134 proc.setNonceKey(nonceKey); 1135 proc.setProcId(currentProcId.longValue()); 1136 1137 // Commit the transaction 1138 store.insert(proc, null); 1139 LOG.debug("Stored {}", proc); 1140 1141 // Add the procedure to the executor 1142 return pushProcedure(proc); 1143 } 1144 1145 /** 1146 * Add a set of new root-procedure to the executor. 1147 * @param procs the new procedures to execute. 1148 */ 1149 // TODO: Do we need to take nonces here? 1150 public void submitProcedures(Procedure<TEnvironment>[] procs) { 1151 Preconditions.checkArgument(lastProcId.get() >= 0); 1152 if (procs == null || procs.length <= 0) { 1153 return; 1154 } 1155 1156 // Prepare procedure 1157 for (int i = 0; i < procs.length; ++i) { 1158 prepareProcedure(procs[i]).setProcId(nextProcId()); 1159 } 1160 1161 // Commit the transaction 1162 store.insert(procs); 1163 if (LOG.isDebugEnabled()) { 1164 LOG.debug("Stored " + Arrays.toString(procs)); 1165 } 1166 1167 // Add the procedure to the executor 1168 for (int i = 0; i < procs.length; ++i) { 1169 pushProcedure(procs[i]); 1170 } 1171 } 1172 1173 private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { 1174 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); 1175 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); 1176 if (this.checkOwnerSet) { 1177 Preconditions.checkArgument(proc.hasOwner(), "missing owner"); 1178 } 1179 return proc; 1180 } 1181 1182 private long pushProcedure(Procedure<TEnvironment> proc) { 1183 final long currentProcId = proc.getProcId(); 1184 1185 // Update metrics on start of a procedure 1186 proc.updateMetricsOnSubmit(getEnvironment()); 1187 1188 // Create the rollback stack for the procedure 1189 RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); 1190 stack.setRollbackSupported(proc.isRollbackSupported()); 1191 rollbackStack.put(currentProcId, stack); 1192 1193 // Submit the new subprocedures 1194 assert !procedures.containsKey(currentProcId); 1195 procedures.put(currentProcId, proc); 1196 sendProcedureAddedNotification(currentProcId); 1197 scheduler.addBack(proc); 1198 return proc.getProcId(); 1199 } 1200 1201 /** 1202 * Send an abort notification the specified procedure. Depending on the procedure implementation 1203 * the abort can be considered or ignored. 1204 * @param procId the procedure to abort 1205 * @return true if the procedure exists and has received the abort, otherwise false. 1206 */ 1207 public boolean abort(long procId) { 1208 return abort(procId, true); 1209 } 1210 1211 /** 1212 * Send an abort notification to the specified procedure. Depending on the procedure 1213 * implementation, the abort can be considered or ignored. 1214 * @param procId the procedure to abort 1215 * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? 1216 * @return true if the procedure exists and has received the abort, otherwise false. 1217 */ 1218 public boolean abort(long procId, boolean mayInterruptIfRunning) { 1219 Procedure<TEnvironment> proc = procedures.get(procId); 1220 if (proc != null) { 1221 if (!mayInterruptIfRunning && proc.wasExecuted()) { 1222 return false; 1223 } 1224 return proc.abort(getEnvironment()); 1225 } 1226 return false; 1227 } 1228 1229 // ========================================================================== 1230 // Executor query helpers 1231 // ========================================================================== 1232 public Procedure<TEnvironment> getProcedure(final long procId) { 1233 return procedures.get(procId); 1234 } 1235 1236 public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { 1237 Procedure<TEnvironment> proc = getProcedure(procId); 1238 if (clazz.isInstance(proc)) { 1239 return clazz.cast(proc); 1240 } 1241 return null; 1242 } 1243 1244 public Procedure<TEnvironment> getResult(long procId) { 1245 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1246 if (retainer == null) { 1247 return null; 1248 } else { 1249 return retainer.getProcedure(); 1250 } 1251 } 1252 1253 /** 1254 * Return true if the procedure is finished. The state may be "completed successfully" or "failed 1255 * and rolledback". Use getResult() to check the state or get the result data. 1256 * @param procId the ID of the procedure to check 1257 * @return true if the procedure execution is finished, otherwise false. 1258 */ 1259 public boolean isFinished(final long procId) { 1260 return !procedures.containsKey(procId); 1261 } 1262 1263 /** 1264 * Return true if the procedure is started. 1265 * @param procId the ID of the procedure to check 1266 * @return true if the procedure execution is started, otherwise false. 1267 */ 1268 public boolean isStarted(long procId) { 1269 Procedure<?> proc = procedures.get(procId); 1270 if (proc == null) { 1271 return completed.get(procId) != null; 1272 } 1273 return proc.wasExecuted(); 1274 } 1275 1276 /** 1277 * Mark the specified completed procedure, as ready to remove. 1278 * @param procId the ID of the procedure to remove 1279 */ 1280 public void removeResult(long procId) { 1281 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1282 if (retainer == null) { 1283 assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; 1284 LOG.debug("pid={} already removed by the cleaner.", procId); 1285 return; 1286 } 1287 1288 // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. 1289 retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); 1290 } 1291 1292 public Procedure<TEnvironment> getResultOrProcedure(long procId) { 1293 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1294 if (retainer == null) { 1295 return procedures.get(procId); 1296 } else { 1297 return retainer.getProcedure(); 1298 } 1299 } 1300 1301 /** 1302 * Check if the user is this procedure's owner 1303 * @param procId the target procedure 1304 * @param user the user 1305 * @return true if the user is the owner of the procedure, false otherwise or the owner is 1306 * unknown. 1307 */ 1308 public boolean isProcedureOwner(long procId, User user) { 1309 if (user == null) { 1310 return false; 1311 } 1312 final Procedure<TEnvironment> runningProc = procedures.get(procId); 1313 if (runningProc != null) { 1314 return runningProc.getOwner().equals(user.getShortName()); 1315 } 1316 1317 final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1318 if (retainer != null) { 1319 return retainer.getProcedure().getOwner().equals(user.getShortName()); 1320 } 1321 1322 // Procedure either does not exist or has already completed and got cleaned up. 1323 // At this time, we cannot check the owner of the procedure 1324 return false; 1325 } 1326 1327 /** 1328 * Should only be used when starting up, where the procedure workers have not been started. 1329 * <p/> 1330 * If the procedure works has been started, the return values maybe changed when you are 1331 * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as 1332 * it will do a copy, and also include the finished procedures. 1333 */ 1334 public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() { 1335 return procedures.values(); 1336 } 1337 1338 /** 1339 * Get procedures. 1340 * @return the procedures in a list 1341 */ 1342 public List<Procedure<TEnvironment>> getProcedures() { 1343 List<Procedure<TEnvironment>> procedureList = 1344 new ArrayList<>(procedures.size() + completed.size()); 1345 procedureList.addAll(procedures.values()); 1346 // Note: The procedure could show up twice in the list with different state, as 1347 // it could complete after we walk through procedures list and insert into 1348 // procedureList - it is ok, as we will use the information in the Procedure 1349 // to figure it out; to prevent this would increase the complexity of the logic. 1350 completed.values().stream().map(CompletedProcedureRetainer::getProcedure) 1351 .forEach(procedureList::add); 1352 return procedureList; 1353 } 1354 1355 // ========================================================================== 1356 // Listeners helpers 1357 // ========================================================================== 1358 public void registerListener(ProcedureExecutorListener listener) { 1359 this.listeners.add(listener); 1360 } 1361 1362 public boolean unregisterListener(ProcedureExecutorListener listener) { 1363 return this.listeners.remove(listener); 1364 } 1365 1366 private void sendProcedureLoadedNotification(final long procId) { 1367 if (!this.listeners.isEmpty()) { 1368 for (ProcedureExecutorListener listener : this.listeners) { 1369 try { 1370 listener.procedureLoaded(procId); 1371 } catch (Throwable e) { 1372 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1373 } 1374 } 1375 } 1376 } 1377 1378 private void sendProcedureAddedNotification(final long procId) { 1379 if (!this.listeners.isEmpty()) { 1380 for (ProcedureExecutorListener listener : this.listeners) { 1381 try { 1382 listener.procedureAdded(procId); 1383 } catch (Throwable e) { 1384 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1385 } 1386 } 1387 } 1388 } 1389 1390 private void sendProcedureFinishedNotification(final long procId) { 1391 if (!this.listeners.isEmpty()) { 1392 for (ProcedureExecutorListener listener : this.listeners) { 1393 try { 1394 listener.procedureFinished(procId); 1395 } catch (Throwable e) { 1396 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1397 } 1398 } 1399 } 1400 } 1401 1402 // ========================================================================== 1403 // Procedure IDs helpers 1404 // ========================================================================== 1405 private long nextProcId() { 1406 long procId = lastProcId.incrementAndGet(); 1407 if (procId < 0) { 1408 while (!lastProcId.compareAndSet(procId, 0)) { 1409 procId = lastProcId.get(); 1410 if (procId >= 0) { 1411 break; 1412 } 1413 } 1414 while (procedures.containsKey(procId)) { 1415 procId = lastProcId.incrementAndGet(); 1416 } 1417 } 1418 assert procId >= 0 : "Invalid procId " + procId; 1419 return procId; 1420 } 1421 1422 protected long getLastProcId() { 1423 return lastProcId.get(); 1424 } 1425 1426 public Set<Long> getActiveProcIds() { 1427 return procedures.keySet(); 1428 } 1429 1430 Long getRootProcedureId(Procedure<TEnvironment> proc) { 1431 return Procedure.getRootProcedureId(procedures, proc); 1432 } 1433 1434 // ========================================================================== 1435 // Executions 1436 // ========================================================================== 1437 private void executeProcedure(Procedure<TEnvironment> proc) { 1438 if (proc.isFinished()) { 1439 LOG.debug("{} is already finished, skipping execution", proc); 1440 return; 1441 } 1442 final Long rootProcId = getRootProcedureId(proc); 1443 if (rootProcId == null) { 1444 // The 'proc' was ready to run but the root procedure was rolledback 1445 LOG.warn("Rollback because parent is done/rolledback proc=" + proc); 1446 executeRollback(proc); 1447 return; 1448 } 1449 1450 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 1451 if (procStack == null) { 1452 LOG.warn("RootProcedureState is null for " + proc.getProcId()); 1453 return; 1454 } 1455 do { 1456 // Try to acquire the execution 1457 if (!procStack.acquire(proc)) { 1458 if (procStack.setRollback()) { 1459 // we have the 'rollback-lock' we can start rollingback 1460 switch (executeRollback(rootProcId, procStack)) { 1461 case LOCK_ACQUIRED: 1462 break; 1463 case LOCK_YIELD_WAIT: 1464 procStack.unsetRollback(); 1465 scheduler.yield(proc); 1466 break; 1467 case LOCK_EVENT_WAIT: 1468 LOG.info("LOCK_EVENT_WAIT rollback..." + proc); 1469 procStack.unsetRollback(); 1470 break; 1471 default: 1472 throw new UnsupportedOperationException(); 1473 } 1474 } else { 1475 // if we can't rollback means that some child is still running. 1476 // the rollback will be executed after all the children are done. 1477 // If the procedure was never executed, remove and mark it as rolledback. 1478 if (!proc.wasExecuted()) { 1479 switch (executeRollback(proc)) { 1480 case LOCK_ACQUIRED: 1481 break; 1482 case LOCK_YIELD_WAIT: 1483 scheduler.yield(proc); 1484 break; 1485 case LOCK_EVENT_WAIT: 1486 LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); 1487 break; 1488 default: 1489 throw new UnsupportedOperationException(); 1490 } 1491 } 1492 } 1493 break; 1494 } 1495 1496 // Execute the procedure 1497 assert proc.getState() == ProcedureState.RUNNABLE : proc; 1498 // Note that lock is NOT about concurrency but rather about ensuring 1499 // ownership of a procedure of an entity such as a region or table 1500 LockState lockState = acquireLock(proc); 1501 switch (lockState) { 1502 case LOCK_ACQUIRED: 1503 execProcedure(procStack, proc); 1504 break; 1505 case LOCK_YIELD_WAIT: 1506 LOG.info(lockState + " " + proc); 1507 scheduler.yield(proc); 1508 break; 1509 case LOCK_EVENT_WAIT: 1510 // Someone will wake us up when the lock is available 1511 LOG.debug(lockState + " " + proc); 1512 break; 1513 default: 1514 throw new UnsupportedOperationException(); 1515 } 1516 procStack.release(proc); 1517 1518 if (proc.isSuccess()) { 1519 // update metrics on finishing the procedure 1520 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); 1521 LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); 1522 // Finalize the procedure state 1523 if (proc.getProcId() == rootProcId) { 1524 rootProcedureFinished(proc); 1525 } else { 1526 execCompletionCleanup(proc); 1527 } 1528 break; 1529 } 1530 } while (procStack.isFailed()); 1531 } 1532 1533 private LockState acquireLock(Procedure<TEnvironment> proc) { 1534 TEnvironment env = getEnvironment(); 1535 // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if 1536 // hasLock is true. 1537 if (proc.hasLock()) { 1538 return LockState.LOCK_ACQUIRED; 1539 } 1540 return proc.doAcquireLock(env, store); 1541 } 1542 1543 private void releaseLock(Procedure<TEnvironment> proc, boolean force) { 1544 TEnvironment env = getEnvironment(); 1545 // For how the framework works, we know that we will always have the lock 1546 // when we call releaseLock(), so we can avoid calling proc.hasLock() 1547 if (force || !proc.holdLock(env) || proc.isFinished()) { 1548 proc.doReleaseLock(env, store); 1549 } 1550 } 1551 1552 // Returning null means we have already held the execution lock, so you do not need to get the 1553 // lock entry for releasing 1554 private IdLock.Entry getLockEntryForRollback(long procId) { 1555 // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need 1556 // this check, as the worker will hold the lock before executing a procedure. This is the only 1557 // place where we may hold two procedure execution locks, and there is a fence in the 1558 // RootProcedureState where we can make sure that only one worker can execute the rollback of 1559 // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to 1560 // prevent race between us and the force update thread. 1561 if (!procExecutionLock.isHeldByCurrentThread(procId)) { 1562 try { 1563 return procExecutionLock.getLockEntry(procId); 1564 } catch (IOException e) { 1565 // can only happen if interrupted, so not a big deal to propagate it 1566 throw new UncheckedIOException(e); 1567 } 1568 } 1569 return null; 1570 } 1571 1572 private void executeUnexpectedRollback(Procedure<TEnvironment> rootProc, 1573 RootProcedureState<TEnvironment> procStack) { 1574 if (procStack.getSubprocs() != null) { 1575 // comparing proc id in reverse order, so we will delete later procedures first, otherwise we 1576 // may delete parent procedure first and if we fail in the middle of this operation, when 1577 // loading we will find some orphan procedures 1578 PriorityQueue<Procedure<TEnvironment>> pq = 1579 new PriorityQueue<>(procStack.getSubprocs().size(), 1580 Comparator.<Procedure<TEnvironment>> comparingLong(Procedure::getProcId).reversed()); 1581 pq.addAll(procStack.getSubprocs()); 1582 for (;;) { 1583 Procedure<TEnvironment> subproc = pq.poll(); 1584 if (subproc == null) { 1585 break; 1586 } 1587 if (!procedures.containsKey(subproc.getProcId())) { 1588 // this means it has already been rolledback 1589 continue; 1590 } 1591 IdLock.Entry lockEntry = getLockEntryForRollback(subproc.getProcId()); 1592 try { 1593 cleanupAfterRollbackOneStep(subproc); 1594 execCompletionCleanup(subproc); 1595 } finally { 1596 if (lockEntry != null) { 1597 procExecutionLock.releaseLockEntry(lockEntry); 1598 } 1599 } 1600 } 1601 } 1602 IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId()); 1603 try { 1604 cleanupAfterRollbackOneStep(rootProc); 1605 } finally { 1606 if (lockEntry != null) { 1607 procExecutionLock.releaseLockEntry(lockEntry); 1608 } 1609 } 1610 } 1611 1612 private LockState executeNormalRollback(Procedure<TEnvironment> rootProc, 1613 RootProcedureState<TEnvironment> procStack) { 1614 List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); 1615 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; 1616 1617 int stackTail = subprocStack.size(); 1618 while (stackTail-- > 0) { 1619 Procedure<TEnvironment> proc = subprocStack.get(stackTail); 1620 IdLock.Entry lockEntry = getLockEntryForRollback(proc.getProcId()); 1621 try { 1622 // For the sub procedures which are successfully finished, we do not rollback them. 1623 // Typically, if we want to rollback a procedure, we first need to rollback it, and then 1624 // recursively rollback its ancestors. The state changes which are done by sub procedures 1625 // should be handled by parent procedures when rolling back. For example, when rolling back 1626 // a MergeTableProcedure, we will schedule new procedures to bring the offline regions 1627 // online, instead of rolling back the original procedures which offlined the regions(in 1628 // fact these procedures can not be rolled back...). 1629 if (proc.isSuccess()) { 1630 // Just do the cleanup work, without actually executing the rollback 1631 subprocStack.remove(stackTail); 1632 cleanupAfterRollbackOneStep(proc); 1633 continue; 1634 } 1635 LockState lockState = acquireLock(proc); 1636 if (lockState != LockState.LOCK_ACQUIRED) { 1637 // can't take a lock on the procedure, add the root-proc back on the 1638 // queue waiting for the lock availability 1639 return lockState; 1640 } 1641 1642 lockState = executeRollback(proc); 1643 releaseLock(proc, false); 1644 boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; 1645 abortRollback |= !isRunning() || !store.isRunning(); 1646 1647 // allows to kill the executor before something is stored to the wal. 1648 // useful to test the procedure recovery. 1649 if (abortRollback) { 1650 return lockState; 1651 } 1652 1653 subprocStack.remove(stackTail); 1654 1655 // if the procedure is kind enough to pass the slot to someone else, yield 1656 // if the proc is already finished, do not yield 1657 if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { 1658 return LockState.LOCK_YIELD_WAIT; 1659 } 1660 1661 if (proc != rootProc) { 1662 execCompletionCleanup(proc); 1663 } 1664 } finally { 1665 if (lockEntry != null) { 1666 procExecutionLock.releaseLockEntry(lockEntry); 1667 } 1668 } 1669 } 1670 return LockState.LOCK_ACQUIRED; 1671 } 1672 1673 /** 1674 * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the 1675 * root-procedure will be visible as finished to user, and the result will be the fatal exception. 1676 */ 1677 private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { 1678 Procedure<TEnvironment> rootProc = procedures.get(rootProcId); 1679 RemoteProcedureException exception = rootProc.getException(); 1680 // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are 1681 // rolling back because the subprocedure does. Clarify. 1682 if (exception == null) { 1683 exception = procStack.getException(); 1684 rootProc.setFailure(exception); 1685 store.update(rootProc); 1686 } 1687 1688 if (procStack.isRollbackSupported()) { 1689 LockState lockState = executeNormalRollback(rootProc, procStack); 1690 if (lockState != LockState.LOCK_ACQUIRED) { 1691 return lockState; 1692 } 1693 } else { 1694 // the procedure does not support rollback, so typically we should not reach here, this 1695 // usually means there are code bugs, let's just wait all the subprocedures to finish and then 1696 // mark the root procedure as failure. 1697 LOG.error(HBaseMarkers.FATAL, 1698 "Root Procedure {} does not support rollback but the execution failed" 1699 + " and try to rollback, code bug?", 1700 rootProc, exception); 1701 executeUnexpectedRollback(rootProc, procStack); 1702 } 1703 1704 IdLock.Entry lockEntry = getLockEntryForRollback(rootProc.getProcId()); 1705 try { 1706 // Finalize the procedure state 1707 LOG.info("Rolled back {} exec-time={}", rootProc, 1708 StringUtils.humanTimeDiff(rootProc.elapsedTime())); 1709 rootProcedureFinished(rootProc); 1710 } finally { 1711 if (lockEntry != null) { 1712 procExecutionLock.releaseLockEntry(lockEntry); 1713 } 1714 } 1715 1716 return LockState.LOCK_ACQUIRED; 1717 } 1718 1719 private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { 1720 if (testing != null && testing.shouldKillBeforeStoreUpdateInRollback()) { 1721 kill("TESTING: Kill BEFORE store update in rollback: " + proc); 1722 } 1723 if (proc.removeStackIndex()) { 1724 if (!proc.isSuccess()) { 1725 proc.setState(ProcedureState.ROLLEDBACK); 1726 } 1727 1728 // update metrics on finishing the procedure (fail) 1729 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); 1730 1731 if (proc.hasParent()) { 1732 store.delete(proc.getProcId()); 1733 procedures.remove(proc.getProcId()); 1734 } else { 1735 final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); 1736 if (childProcIds != null) { 1737 store.delete(proc, childProcIds); 1738 } else { 1739 store.update(proc); 1740 } 1741 } 1742 } else { 1743 store.update(proc); 1744 } 1745 } 1746 1747 /** 1748 * Execute the rollback of the procedure step. It updates the store with the new state (stack 1749 * index) or will remove completly the procedure in case it is a child. 1750 */ 1751 private LockState executeRollback(Procedure<TEnvironment> proc) { 1752 try { 1753 proc.doRollback(getEnvironment()); 1754 } catch (IOException e) { 1755 LOG.debug("Roll back attempt failed for {}", proc, e); 1756 return LockState.LOCK_YIELD_WAIT; 1757 } catch (InterruptedException e) { 1758 handleInterruptedException(proc, e); 1759 return LockState.LOCK_YIELD_WAIT; 1760 } catch (Throwable e) { 1761 // Catch NullPointerExceptions or similar errors... 1762 LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); 1763 } 1764 1765 cleanupAfterRollbackOneStep(proc); 1766 1767 return LockState.LOCK_ACQUIRED; 1768 } 1769 1770 private void yieldProcedure(Procedure<TEnvironment> proc) { 1771 releaseLock(proc, false); 1772 scheduler.yield(proc); 1773 } 1774 1775 /** 1776 * Executes <code>procedure</code> 1777 * <ul> 1778 * <li>Calls the doExecute() of the procedure 1779 * <li>If the procedure execution didn't fail (i.e. valid user input) 1780 * <ul> 1781 * <li>...and returned subprocedures 1782 * <ul> 1783 * <li>The subprocedures are initialized. 1784 * <li>The subprocedures are added to the store 1785 * <li>The subprocedures are added to the runnable queue 1786 * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete 1787 * </ul> 1788 * </li> 1789 * <li>...if there are no subprocedure 1790 * <ul> 1791 * <li>the procedure completed successfully 1792 * <li>if there is a parent (WAITING) 1793 * <li>the parent state will be set to RUNNABLE 1794 * </ul> 1795 * </li> 1796 * </ul> 1797 * </li> 1798 * <li>In case of failure 1799 * <ul> 1800 * <li>The store is updated with the new state</li> 1801 * <li>The executor (caller of this method) will start the rollback of the procedure</li> 1802 * </ul> 1803 * </li> 1804 * </ul> 1805 */ 1806 private void execProcedure(RootProcedureState<TEnvironment> procStack, 1807 Procedure<TEnvironment> procedure) { 1808 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 1809 "NOT RUNNABLE! " + procedure.toString()); 1810 1811 // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. 1812 // The exception is caught below and then we hurry to the exit without disturbing state. The 1813 // idea is that the processing of this procedure will be unsuspended later by an external event 1814 // such the report of a region open. 1815 boolean suspended = false; 1816 1817 // Whether to 're-' -execute; run through the loop again. 1818 boolean reExecute = false; 1819 1820 Procedure<TEnvironment>[] subprocs = null; 1821 do { 1822 reExecute = false; 1823 procedure.resetPersistence(); 1824 try { 1825 subprocs = procedure.doExecute(getEnvironment()); 1826 if (subprocs != null && subprocs.length == 0) { 1827 subprocs = null; 1828 } 1829 } catch (ProcedureSuspendedException e) { 1830 LOG.trace("Suspend {}", procedure); 1831 suspended = true; 1832 } catch (ProcedureYieldException e) { 1833 LOG.trace("Yield {}", procedure, e); 1834 yieldProcedure(procedure); 1835 return; 1836 } catch (InterruptedException e) { 1837 LOG.trace("Yield interrupt {}", procedure, e); 1838 handleInterruptedException(procedure, e); 1839 yieldProcedure(procedure); 1840 return; 1841 } catch (Throwable e) { 1842 // Catch NullPointerExceptions or similar errors... 1843 String msg = "CODE-BUG: Uncaught runtime exception: " + procedure; 1844 LOG.error(msg, e); 1845 procedure.setFailure(new RemoteProcedureException(msg, e)); 1846 } 1847 1848 if (!procedure.isFailed()) { 1849 if (subprocs != null) { 1850 if (subprocs.length == 1 && subprocs[0] == procedure) { 1851 // Procedure returned itself. Quick-shortcut for a state machine-like procedure; 1852 // i.e. we go around this loop again rather than go back out on the scheduler queue. 1853 subprocs = null; 1854 reExecute = true; 1855 LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); 1856 } else { 1857 // Yield the current procedure, and make the subprocedure runnable 1858 // subprocs may come back 'null'. 1859 subprocs = initializeChildren(procStack, procedure, subprocs); 1860 LOG.info("Initialized subprocedures=" + (subprocs == null 1861 ? null 1862 : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList()) 1863 .toString())); 1864 } 1865 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1866 LOG.trace("Added to timeoutExecutor {}", procedure); 1867 timeoutExecutor.add(procedure); 1868 } else if (!suspended) { 1869 // No subtask, so we are done 1870 procedure.setState(ProcedureState.SUCCESS); 1871 } 1872 } 1873 1874 // allows to kill the executor before something is stored to the wal. 1875 // useful to test the procedure recovery. 1876 if ( 1877 testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent()) 1878 ) { 1879 kill("TESTING: Kill BEFORE store update: " + procedure); 1880 } 1881 1882 // TODO: The code here doesn't check if store is running before persisting to the store as 1883 // it relies on the method call below to throw RuntimeException to wind up the stack and 1884 // executor thread to stop. The statement following the method call below seems to check if 1885 // store is not running, to prevent scheduling children procedures, re-execution or yield 1886 // of this procedure. This may need more scrutiny and subsequent cleanup in future 1887 // 1888 // Commit the transaction even if a suspend (state may have changed). Note this append 1889 // can take a bunch of time to complete. 1890 if (procedure.needPersistence()) { 1891 // Add the procedure to the stack 1892 // See HBASE-28210 on why we need synchronized here 1893 boolean needUpdateStoreOutsideLock = false; 1894 synchronized (procStack) { 1895 if (procStack.addRollbackStep(procedure)) { 1896 updateStoreOnExec(procStack, procedure, subprocs); 1897 } else { 1898 needUpdateStoreOutsideLock = true; 1899 } 1900 } 1901 // this is an optimization if we do not need to maintain rollback step, as all subprocedures 1902 // of the same root procedure share the same root procedure state, if we can only update 1903 // store under the above lock, the sub procedures of the same root procedure can only be 1904 // persistent sequentially, which will have a bad performance. See HBASE-28212 for more 1905 // details. 1906 if (needUpdateStoreOutsideLock) { 1907 updateStoreOnExec(procStack, procedure, subprocs); 1908 } 1909 } 1910 1911 // if the store is not running we are aborting 1912 if (!store.isRunning()) { 1913 return; 1914 } 1915 // if the procedure is kind enough to pass the slot to someone else, yield 1916 if ( 1917 procedure.isRunnable() && !suspended 1918 && procedure.isYieldAfterExecutionStep(getEnvironment()) 1919 ) { 1920 yieldProcedure(procedure); 1921 return; 1922 } 1923 1924 assert (reExecute && subprocs == null) || !reExecute; 1925 } while (reExecute); 1926 1927 // Allows to kill the executor after something is stored to the WAL but before the below 1928 // state settings are done -- in particular the one on the end where we make parent 1929 // RUNNABLE again when its children are done; see countDownChildren. 1930 if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) { 1931 kill("TESTING: Kill AFTER store update: " + procedure); 1932 } 1933 1934 // Submit the new subprocedures 1935 if (subprocs != null && !procedure.isFailed()) { 1936 submitChildrenProcedures(subprocs); 1937 } 1938 1939 // we need to log the release lock operation before waking up the parent procedure, as there 1940 // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all 1941 // the sub procedures from store and cause problems... 1942 releaseLock(procedure, false); 1943 1944 // if the procedure is complete and has a parent, count down the children latch. 1945 // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. 1946 if (!suspended && procedure.isFinished() && procedure.hasParent()) { 1947 countDownChildren(procStack, procedure); 1948 } 1949 } 1950 1951 private void kill(String msg) { 1952 LOG.debug(msg); 1953 stop(); 1954 throw new RuntimeException(msg); 1955 } 1956 1957 private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, 1958 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1959 assert subprocs != null : "expected subprocedures"; 1960 final long rootProcId = getRootProcedureId(procedure); 1961 for (int i = 0; i < subprocs.length; ++i) { 1962 Procedure<TEnvironment> subproc = subprocs[i]; 1963 if (subproc == null) { 1964 String msg = "subproc[" + i + "] is null, aborting the procedure"; 1965 procedure 1966 .setFailure(new RemoteProcedureException(msg, new IllegalArgumentIOException(msg))); 1967 return null; 1968 } 1969 1970 assert subproc.getState() == ProcedureState.INITIALIZING : subproc; 1971 subproc.setParentProcId(procedure.getProcId()); 1972 subproc.setRootProcId(rootProcId); 1973 subproc.setProcId(nextProcId()); 1974 procStack.addSubProcedure(subproc); 1975 } 1976 1977 if (!procedure.isFailed()) { 1978 procedure.setChildrenLatch(subprocs.length); 1979 switch (procedure.getState()) { 1980 case RUNNABLE: 1981 procedure.setState(ProcedureState.WAITING); 1982 break; 1983 case WAITING_TIMEOUT: 1984 timeoutExecutor.add(procedure); 1985 break; 1986 default: 1987 break; 1988 } 1989 } 1990 return subprocs; 1991 } 1992 1993 private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { 1994 for (int i = 0; i < subprocs.length; ++i) { 1995 Procedure<TEnvironment> subproc = subprocs[i]; 1996 subproc.updateMetricsOnSubmit(getEnvironment()); 1997 assert !procedures.containsKey(subproc.getProcId()); 1998 procedures.put(subproc.getProcId(), subproc); 1999 scheduler.addFront(subproc); 2000 } 2001 } 2002 2003 private void countDownChildren(RootProcedureState<TEnvironment> procStack, 2004 Procedure<TEnvironment> procedure) { 2005 Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); 2006 if (parent == null) { 2007 assert procStack.isRollingback(); 2008 return; 2009 } 2010 2011 // If this procedure is the last child awake the parent procedure 2012 if (parent.tryRunnable()) { 2013 // If we succeeded in making the parent runnable -- i.e. all of its 2014 // children have completed, move parent to front of the queue. 2015 store.update(parent); 2016 scheduler.addFront(parent); 2017 LOG.info("Finished subprocedure pid={}, resume processing ppid={}", procedure.getProcId(), 2018 parent.getProcId()); 2019 return; 2020 } 2021 } 2022 2023 private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, 2024 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 2025 if (subprocs != null && !procedure.isFailed()) { 2026 if (LOG.isTraceEnabled()) { 2027 LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); 2028 } 2029 store.insert(procedure, subprocs); 2030 } else { 2031 LOG.trace("Store update {}", procedure); 2032 if (procedure.isFinished() && !procedure.hasParent()) { 2033 // remove child procedures 2034 final long[] childProcIds = procStack.getSubprocedureIds(); 2035 if (childProcIds != null) { 2036 store.delete(procedure, childProcIds); 2037 for (int i = 0; i < childProcIds.length; ++i) { 2038 procedures.remove(childProcIds[i]); 2039 } 2040 } else { 2041 store.update(procedure); 2042 } 2043 } else { 2044 store.update(procedure); 2045 } 2046 } 2047 } 2048 2049 private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { 2050 LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); 2051 // NOTE: We don't call Thread.currentThread().interrupt() 2052 // because otherwise all the subsequent calls e.g. Thread.sleep() will throw 2053 // the InterruptedException. If the master is going down, we will be notified 2054 // and the executor/store will be stopped. 2055 // (The interrupted procedure will be retried on the next run) 2056 } 2057 2058 private void execCompletionCleanup(Procedure<TEnvironment> proc) { 2059 final TEnvironment env = getEnvironment(); 2060 if (proc.hasLock()) { 2061 LOG.warn("Usually this should not happen, we will release the lock before if the procedure" 2062 + " is finished, even if the holdLock is true, arrive here means we have some holes where" 2063 + " we do not release the lock. And the releaseLock below may fail since the procedure may" 2064 + " have already been deleted from the procedure store."); 2065 releaseLock(proc, true); 2066 } 2067 try { 2068 proc.completionCleanup(env); 2069 } catch (Throwable e) { 2070 // Catch NullPointerExceptions or similar errors... 2071 LOG.error("CODE-BUG: uncaught runtime exception for procedure: " + proc, e); 2072 } 2073 2074 // call schedulers completion cleanup, we have some special clean up logic in this method so if 2075 // it throws any exceptions, we can not just ignore it like the above procedure's cleanup 2076 scheduler.completionCleanup(proc); 2077 } 2078 2079 private void rootProcedureFinished(Procedure<TEnvironment> proc) { 2080 // call the procedure completion cleanup handler 2081 execCompletionCleanup(proc); 2082 2083 CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); 2084 2085 // update the executor internal state maps 2086 if (!proc.shouldWaitClientAck(getEnvironment())) { 2087 retainer.setClientAckTime(0); 2088 } 2089 2090 completed.put(proc.getProcId(), retainer); 2091 rollbackStack.remove(proc.getProcId()); 2092 procedures.remove(proc.getProcId()); 2093 2094 // Notify the listeners 2095 sendProcedureFinishedNotification(proc.getProcId()); 2096 } 2097 2098 RootProcedureState<TEnvironment> getProcStack(long rootProcId) { 2099 return rollbackStack.get(rootProcId); 2100 } 2101 2102 ProcedureScheduler getProcedureScheduler() { 2103 return scheduler; 2104 } 2105 2106 int getCompletedSize() { 2107 return completed.size(); 2108 } 2109 2110 public IdLock getProcExecutionLock() { 2111 return procExecutionLock; 2112 } 2113 2114 /** 2115 * Get a thread pool for executing some asynchronous tasks 2116 */ 2117 public ExecutorService getAsyncTaskExecutor() { 2118 return asyncTaskExecutor; 2119 } 2120 2121 // ========================================================================== 2122 // Worker Thread 2123 // ========================================================================== 2124 private class WorkerThread extends StoppableThread { 2125 private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); 2126 private volatile Procedure<TEnvironment> activeProcedure; 2127 2128 public WorkerThread(ThreadGroup group) { 2129 this(group, "PEWorker-"); 2130 } 2131 2132 protected WorkerThread(ThreadGroup group, String prefix) { 2133 super(group, prefix + workerId.incrementAndGet()); 2134 setDaemon(true); 2135 } 2136 2137 @Override 2138 public void sendStopSignal() { 2139 scheduler.signalAll(); 2140 } 2141 2142 /** 2143 * Encapsulates execution of the current {@link #activeProcedure} for easy tracing. 2144 */ 2145 private long runProcedure() throws IOException { 2146 final Procedure<TEnvironment> proc = this.activeProcedure; 2147 int activeCount = activeExecutorCount.incrementAndGet(); 2148 int runningCount = store.setRunningProcedureCount(activeCount); 2149 LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, 2150 activeCount); 2151 executionStartTime.set(EnvironmentEdgeManager.currentTime()); 2152 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 2153 try { 2154 executeProcedure(proc); 2155 } catch (AssertionError e) { 2156 LOG.info("ASSERT pid=" + proc.getProcId(), e); 2157 throw e; 2158 } finally { 2159 procExecutionLock.releaseLockEntry(lockEntry); 2160 activeCount = activeExecutorCount.decrementAndGet(); 2161 runningCount = store.setRunningProcedureCount(activeCount); 2162 LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, 2163 activeCount); 2164 this.activeProcedure = null; 2165 executionStartTime.set(Long.MAX_VALUE); 2166 } 2167 return EnvironmentEdgeManager.currentTime(); 2168 } 2169 2170 @Override 2171 public void run() { 2172 long lastUpdate = EnvironmentEdgeManager.currentTime(); 2173 try { 2174 while (isRunning() && keepAlive(lastUpdate)) { 2175 @SuppressWarnings("unchecked") 2176 Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); 2177 if (proc == null) { 2178 continue; 2179 } 2180 this.activeProcedure = proc; 2181 lastUpdate = TraceUtil.trace(this::runProcedure, new ProcedureSpanBuilder(proc)); 2182 } 2183 } catch (Throwable t) { 2184 LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); 2185 } finally { 2186 LOG.trace("Worker terminated."); 2187 } 2188 workerThreads.remove(this); 2189 } 2190 2191 @Override 2192 public String toString() { 2193 Procedure<?> p = this.activeProcedure; 2194 return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")"); 2195 } 2196 2197 /** Returns the time since the current procedure is running */ 2198 public long getCurrentRunTime() { 2199 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); 2200 } 2201 2202 // core worker never timeout 2203 protected boolean keepAlive(long lastUpdate) { 2204 return true; 2205 } 2206 } 2207 2208 // A worker thread which can be added when core workers are stuck. Will timeout after 2209 // keepAliveTime if there is no procedure to run. 2210 private final class KeepAliveWorkerThread extends WorkerThread { 2211 public KeepAliveWorkerThread(ThreadGroup group) { 2212 super(group, "KeepAlivePEWorker-"); 2213 } 2214 2215 @Override 2216 protected boolean keepAlive(long lastUpdate) { 2217 return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; 2218 } 2219 } 2220 2221 // ---------------------------------------------------------------------------- 2222 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the 2223 // full set of procedures pending and completed to write a compacted 2224 // version of the log (in case is a log)? 2225 // In theory no, procedures are have a short life, so at some point the store 2226 // will have the tracker saying everything is in the last log. 2227 // ---------------------------------------------------------------------------- 2228 2229 private final class WorkerMonitor extends InlineChore { 2230 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = 2231 "hbase.procedure.worker.monitor.interval.msec"; 2232 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec 2233 2234 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = 2235 "hbase.procedure.worker.stuck.threshold.msec"; 2236 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec 2237 2238 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = 2239 "hbase.procedure.worker.add.stuck.percentage"; 2240 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck 2241 2242 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; 2243 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; 2244 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; 2245 2246 public WorkerMonitor() { 2247 refreshConfig(); 2248 } 2249 2250 @Override 2251 public void run() { 2252 final int stuckCount = checkForStuckWorkers(); 2253 checkThreadCount(stuckCount); 2254 2255 // refresh interval (poor man dynamic conf update) 2256 refreshConfig(); 2257 } 2258 2259 private int checkForStuckWorkers() { 2260 // check if any of the worker is stuck 2261 int stuckCount = 0; 2262 for (WorkerThread worker : workerThreads) { 2263 if (worker.getCurrentRunTime() < stuckThreshold) { 2264 continue; 2265 } 2266 2267 // WARN the worker is stuck 2268 stuckCount++; 2269 LOG.warn("Worker stuck {}, run time {}", worker, 2270 StringUtils.humanTimeDiff(worker.getCurrentRunTime())); 2271 } 2272 return stuckCount; 2273 } 2274 2275 private void checkThreadCount(final int stuckCount) { 2276 // nothing to do if there are no runnable tasks 2277 if (stuckCount < 1 || !scheduler.hasRunnables()) { 2278 return; 2279 } 2280 2281 // add a new thread if the worker stuck percentage exceed the threshold limit 2282 // and every handler is active. 2283 final float stuckPerc = ((float) stuckCount) / workerThreads.size(); 2284 // let's add new worker thread more aggressively, as they will timeout finally if there is no 2285 // work to do. 2286 if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { 2287 final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); 2288 workerThreads.add(worker); 2289 worker.start(); 2290 LOG.debug("Added new worker thread {}", worker); 2291 } 2292 } 2293 2294 private void refreshConfig() { 2295 addWorkerStuckPercentage = 2296 conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); 2297 timeoutInterval = 2298 conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL); 2299 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, DEFAULT_WORKER_STUCK_THRESHOLD); 2300 } 2301 2302 @Override 2303 public int getTimeoutInterval() { 2304 return timeoutInterval; 2305 } 2306 } 2307}