001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import edu.umd.cs.findbugs.annotations.Nullable; 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.util.ArrayDeque; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Deque; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.CopyOnWriteArrayList; 033import java.util.concurrent.Executor; 034import java.util.concurrent.Executors; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.concurrent.atomic.AtomicLong; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 044import org.apache.hadoop.hbase.log.HBaseMarkers; 045import org.apache.hadoop.hbase.procedure2.Procedure.LockState; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 049import org.apache.hadoop.hbase.procedure2.trace.ProcedureSpanBuilder; 050import org.apache.hadoop.hbase.procedure2.util.StringUtils; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.trace.TraceUtil; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.IdLock; 055import org.apache.hadoop.hbase.util.NonceKey; 056import org.apache.hadoop.hbase.util.Threads; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 063 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 065 066/** 067 * Thread Pool that executes the submitted procedures. The executor has a ProcedureStore associated. 068 * Each operation is logged and on restart the pending procedures are resumed. Unless the Procedure 069 * code throws an error (e.g. invalid user input) the procedure will complete (at some point in 070 * time), On restart the pending procedures are resumed and the once failed will be rolledback. The 071 * user can add procedures to the executor via submitProcedure(proc) check for the finished state 072 * via isFinished(procId) and get the result via getResult(procId) 073 */ 074@InterfaceAudience.Private 075public class ProcedureExecutor<TEnvironment> { 076 private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); 077 078 public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; 079 private static final boolean DEFAULT_CHECK_OWNER_SET = false; 080 081 public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = 082 "hbase.procedure.worker.keep.alive.time.msec"; 083 private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); 084 085 public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; 086 static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min 087 088 public static final String EVICT_ACKED_TTL_CONF_KEY = "hbase.procedure.cleaner.acked.evict.ttl"; 089 static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min 090 091 /** 092 * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to break PE 093 * having it fail at various junctures. When non-null, testing is set to an instance of the below 094 * internal {@link Testing} class with flags set for the particular test. 095 */ 096 volatile Testing testing = null; 097 098 /** 099 * Class with parameters describing how to fail/die when in testing-context. 100 */ 101 public static class Testing { 102 protected volatile boolean killIfHasParent = true; 103 protected volatile boolean killIfSuspended = false; 104 105 /** 106 * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is 107 * persisting all the state it needs to recover after a crash. 108 */ 109 protected volatile boolean killBeforeStoreUpdate = false; 110 protected volatile boolean toggleKillBeforeStoreUpdate = false; 111 112 /** 113 * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978 114 * is about a case where memory-state was being set after store to WAL where a crash could cause 115 * us to get stuck. This flag allows killing at what was a vulnerable time. 116 */ 117 protected volatile boolean killAfterStoreUpdate = false; 118 protected volatile boolean toggleKillAfterStoreUpdate = false; 119 120 protected boolean shouldKillBeforeStoreUpdate() { 121 final boolean kill = this.killBeforeStoreUpdate; 122 if (this.toggleKillBeforeStoreUpdate) { 123 this.killBeforeStoreUpdate = !kill; 124 LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); 125 } 126 return kill; 127 } 128 129 protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) { 130 if (isSuspended && !killIfSuspended) { 131 return false; 132 } 133 if (hasParent && !killIfHasParent) { 134 return false; 135 } 136 return shouldKillBeforeStoreUpdate(); 137 } 138 139 protected boolean shouldKillAfterStoreUpdate() { 140 final boolean kill = this.killAfterStoreUpdate; 141 if (this.toggleKillAfterStoreUpdate) { 142 this.killAfterStoreUpdate = !kill; 143 LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate); 144 } 145 return kill; 146 } 147 148 protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { 149 return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); 150 } 151 } 152 153 public interface ProcedureExecutorListener { 154 void procedureLoaded(long procId); 155 156 void procedureAdded(long procId); 157 158 void procedureFinished(long procId); 159 } 160 161 /** 162 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. Once a 163 * Root-Procedure completes (success or failure), the result will be added to this map. The user 164 * of ProcedureExecutor should call getResult(procId) to get the result. 165 */ 166 private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = 167 new ConcurrentHashMap<>(); 168 169 /** 170 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. 171 * The RootProcedureState contains the execution stack of the Root-Procedure, It is added to the 172 * map by submitProcedure() and removed on procedure completion. 173 */ 174 private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = 175 new ConcurrentHashMap<>(); 176 177 /** 178 * Helper map to lookup the live procedures by ID. This map contains every procedure. 179 * root-procedures and subprocedures. 180 */ 181 private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = 182 new ConcurrentHashMap<>(); 183 184 /** 185 * Helper map to lookup whether the procedure already issued from the same client. This map 186 * contains every root procedure. 187 */ 188 private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); 189 190 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = 191 new CopyOnWriteArrayList<>(); 192 193 private Configuration conf; 194 195 /** 196 * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing 197 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 198 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 199 */ 200 private ThreadGroup threadGroup; 201 202 /** 203 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 204 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 205 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 206 */ 207 private CopyOnWriteArrayList<WorkerThread> workerThreads; 208 209 /** 210 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 211 * resource handling rather than observing in a #join is unexpected). Overridden when we do the 212 * ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery (Should be ok). 213 */ 214 private TimeoutExecutorThread<TEnvironment> timeoutExecutor; 215 216 /** 217 * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if 218 * there is no worker to assign meta, it will new worker thread for it, so it is very important. 219 * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore and 220 * so on, some tasks may execute for a long time so will block other tasks like WorkerMonitor, so 221 * use a dedicated thread for executing WorkerMonitor. 222 */ 223 private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor; 224 225 private int corePoolSize; 226 private int maxPoolSize; 227 228 private volatile long keepAliveTime; 229 230 /** 231 * Scheduler/Queue that contains runnable procedures. 232 */ 233 private final ProcedureScheduler scheduler; 234 235 private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( 236 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); 237 238 private final AtomicLong lastProcId = new AtomicLong(-1); 239 private final AtomicLong workerId = new AtomicLong(0); 240 private final AtomicInteger activeExecutorCount = new AtomicInteger(0); 241 private final AtomicBoolean running = new AtomicBoolean(false); 242 private final TEnvironment environment; 243 private final ProcedureStore store; 244 245 private final boolean checkOwnerSet; 246 247 // To prevent concurrent execution of the same procedure. 248 // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the 249 // procedure is woken up before we finish the suspend which causes the same procedures to be 250 // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also 251 // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent 252 // execution of the same procedure. 253 private final IdLock procExecutionLock = new IdLock(); 254 255 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 256 final ProcedureStore store) { 257 this(conf, environment, store, new SimpleProcedureScheduler()); 258 } 259 260 private boolean isRootFinished(Procedure<?> proc) { 261 Procedure<?> rootProc = procedures.get(proc.getRootProcId()); 262 return rootProc == null || rootProc.isFinished(); 263 } 264 265 private void forceUpdateProcedure(long procId) throws IOException { 266 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); 267 try { 268 Procedure<TEnvironment> proc = procedures.get(procId); 269 if (proc != null) { 270 if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) { 271 LOG.debug("Procedure {} has already been finished and parent is succeeded," 272 + " skip force updating", proc); 273 return; 274 } 275 } else { 276 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 277 if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) { 278 LOG.debug("No pending procedure with id = {}, skip force updating.", procId); 279 return; 280 } 281 long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); 282 long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); 283 if (retainer.isExpired(EnvironmentEdgeManager.currentTime(), evictTtl, evictAckTtl)) { 284 LOG.debug("Procedure {} has already been finished and expired, skip force updating", 285 procId); 286 return; 287 } 288 proc = retainer.getProcedure(); 289 } 290 LOG.debug("Force update procedure {}", proc); 291 store.update(proc); 292 } finally { 293 procExecutionLock.releaseLockEntry(lockEntry); 294 } 295 } 296 297 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 298 final ProcedureStore store, final ProcedureScheduler scheduler) { 299 this.environment = environment; 300 this.scheduler = scheduler; 301 this.store = store; 302 this.conf = conf; 303 this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); 304 refreshConfiguration(conf); 305 store.registerListener(new ProcedureStoreListener() { 306 307 @Override 308 public void forceUpdate(long[] procIds) { 309 Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { 310 try { 311 forceUpdateProcedure(procId); 312 } catch (IOException e) { 313 LOG.warn("Failed to force update procedure with pid={}", procId); 314 } 315 })); 316 } 317 }); 318 } 319 320 private void load(final boolean abortOnCorruption) throws IOException { 321 Preconditions.checkArgument(completed.isEmpty(), "completed not empty"); 322 Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty"); 323 Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty"); 324 Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty"); 325 326 store.load(new ProcedureStore.ProcedureLoader() { 327 @Override 328 public void setMaxProcId(long maxProcId) { 329 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; 330 lastProcId.set(maxProcId); 331 } 332 333 @Override 334 public void load(ProcedureIterator procIter) throws IOException { 335 loadProcedures(procIter, abortOnCorruption); 336 } 337 338 @Override 339 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 340 int corruptedCount = 0; 341 while (procIter.hasNext()) { 342 Procedure<?> proc = procIter.next(); 343 LOG.error("Corrupt " + proc); 344 corruptedCount++; 345 } 346 if (abortOnCorruption && corruptedCount > 0) { 347 throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); 348 } 349 } 350 }); 351 } 352 353 private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { 354 proc.restoreLock(getEnvironment()); 355 restored.add(proc.getProcId()); 356 } 357 358 private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { 359 while (!stack.isEmpty()) { 360 restoreLock(stack.pop(), restored); 361 } 362 } 363 364 // Restore the locks for all the procedures. 365 // Notice that we need to restore the locks starting from the root proc, otherwise there will be 366 // problem that a sub procedure may hold the exclusive lock first and then we are stuck when 367 // calling the acquireLock method for the parent procedure. 368 // The algorithm is straight-forward: 369 // 1. Use a set to record the procedures which locks have already been restored. 370 // 2. Use a stack to store the hierarchy of the procedures 371 // 3. For all the procedure, we will first try to find its parent and push it into the stack, 372 // unless 373 // a. We have no parent, i.e, we are the root procedure 374 // b. The lock has already been restored(by checking the set introduced in #1) 375 // then we start to pop the stack and call acquireLock for each procedure. 376 // Notice that this should be done for all procedures, not only the ones in runnableList. 377 private void restoreLocks() { 378 Set<Long> restored = new HashSet<>(); 379 Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); 380 procedures.values().forEach(proc -> { 381 for (;;) { 382 if (restored.contains(proc.getProcId())) { 383 restoreLocks(stack, restored); 384 return; 385 } 386 if (!proc.hasParent()) { 387 restoreLock(proc, restored); 388 restoreLocks(stack, restored); 389 return; 390 } 391 stack.push(proc); 392 proc = procedures.get(proc.getParentProcId()); 393 } 394 }); 395 } 396 397 private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) 398 throws IOException { 399 // 1. Build the rollback stack 400 int runnableCount = 0; 401 int failedCount = 0; 402 int waitingCount = 0; 403 int waitingTimeoutCount = 0; 404 while (procIter.hasNext()) { 405 boolean finished = procIter.isNextFinished(); 406 @SuppressWarnings("unchecked") 407 Procedure<TEnvironment> proc = procIter.next(); 408 NonceKey nonceKey = proc.getNonceKey(); 409 long procId = proc.getProcId(); 410 411 if (finished) { 412 completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); 413 LOG.debug("Completed {}", proc); 414 } else { 415 if (!proc.hasParent()) { 416 assert !proc.isFinished() : "unexpected finished procedure"; 417 rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); 418 } 419 420 // add the procedure to the map 421 proc.beforeReplay(getEnvironment()); 422 procedures.put(proc.getProcId(), proc); 423 switch (proc.getState()) { 424 case RUNNABLE: 425 runnableCount++; 426 break; 427 case FAILED: 428 failedCount++; 429 break; 430 case WAITING: 431 waitingCount++; 432 break; 433 case WAITING_TIMEOUT: 434 waitingTimeoutCount++; 435 break; 436 default: 437 break; 438 } 439 } 440 441 if (nonceKey != null) { 442 nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map 443 } 444 } 445 446 // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will 447 // push it into the ProcedureScheduler directly to execute the rollback. But this does not work 448 // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove 449 // the queue from runQueue in scheduler, and then when a procedure which has lock access, for 450 // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler, 451 // we will add the queue back to let the workers poll from it. The assumption here is that, the 452 // procedure which has the xlock should have been polled out already, so when loading we can not 453 // add the procedure to scheduler first and then call acquireLock, since the procedure is still 454 // in the queue, and since we will remove the queue from runQueue, then no one can poll it out, 455 // then there is a dead lock 456 List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); 457 List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); 458 List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); 459 List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); 460 procIter.reset(); 461 while (procIter.hasNext()) { 462 if (procIter.isNextFinished()) { 463 procIter.skipNext(); 464 continue; 465 } 466 467 @SuppressWarnings("unchecked") 468 Procedure<TEnvironment> proc = procIter.next(); 469 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; 470 LOG.debug("Loading {}", proc); 471 Long rootProcId = getRootProcedureId(proc); 472 // The orphan procedures will be passed to handleCorrupted, so add an assert here 473 assert rootProcId != null; 474 475 if (proc.hasParent()) { 476 Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); 477 if (parent != null && !proc.isFinished()) { 478 parent.incChildrenLatch(); 479 } 480 } 481 482 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 483 procStack.loadStack(proc); 484 485 proc.setRootProcId(rootProcId); 486 switch (proc.getState()) { 487 case RUNNABLE: 488 runnableList.add(proc); 489 break; 490 case WAITING: 491 waitingList.add(proc); 492 break; 493 case WAITING_TIMEOUT: 494 waitingTimeoutList.add(proc); 495 break; 496 case FAILED: 497 failedList.add(proc); 498 break; 499 case ROLLEDBACK: 500 case INITIALIZING: 501 String msg = "Unexpected " + proc.getState() + " state for " + proc; 502 LOG.error(msg); 503 throw new UnsupportedOperationException(msg); 504 default: 505 break; 506 } 507 } 508 509 // 3. Check the waiting procedures to see if some of them can be added to runnable. 510 waitingList.forEach(proc -> { 511 if (!proc.hasChildren()) { 512 // Normally, WAITING procedures should be waken by its children. But, there is a case that, 513 // all the children are successful and before they can wake up their parent procedure, the 514 // master was killed. So, during recovering the procedures from ProcedureWal, its children 515 // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING 516 // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a 517 // exception will throw: 518 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 519 // "NOT RUNNABLE! " + procedure.toString()); 520 proc.setState(ProcedureState.RUNNABLE); 521 runnableList.add(proc); 522 } else { 523 proc.afterReplay(getEnvironment()); 524 } 525 }); 526 // 4. restore locks 527 restoreLocks(); 528 529 // 5. Push the procedures to the timeout executor 530 waitingTimeoutList.forEach(proc -> { 531 proc.afterReplay(getEnvironment()); 532 timeoutExecutor.add(proc); 533 }); 534 535 // 6. Push the procedure to the scheduler 536 failedList.forEach(scheduler::addBack); 537 runnableList.forEach(p -> { 538 p.afterReplay(getEnvironment()); 539 if (!p.hasParent()) { 540 sendProcedureLoadedNotification(p.getProcId()); 541 } 542 scheduler.addBack(p); 543 }); 544 // After all procedures put into the queue, signal the worker threads. 545 // Otherwise, there is a race condition. See HBASE-21364. 546 scheduler.signalAll(); 547 } 548 549 /** 550 * Initialize the procedure executor, but do not start workers. We will start them later. 551 * <p/> 552 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 553 * ensure a single executor, and start the procedure replay to resume and recover the previous 554 * pending and in-progress procedures. 555 * @param numThreads number of threads available for procedure execution. 556 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 557 * is found on replay. otherwise false. 558 */ 559 public void init(int numThreads, boolean abortOnCorruption) throws IOException { 560 // We have numThreads executor + one timer thread used for timing out 561 // procedures and triggering periodic procedures. 562 this.corePoolSize = numThreads; 563 this.maxPoolSize = 10 * numThreads; 564 LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", 565 corePoolSize, maxPoolSize); 566 567 this.threadGroup = new ThreadGroup("PEWorkerGroup"); 568 this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); 569 this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); 570 571 // Create the workers 572 workerId.set(0); 573 workerThreads = new CopyOnWriteArrayList<>(); 574 for (int i = 0; i < corePoolSize; ++i) { 575 workerThreads.add(new WorkerThread(threadGroup)); 576 } 577 578 long st, et; 579 580 // Acquire the store lease. 581 st = System.nanoTime(); 582 store.recoverLease(); 583 et = System.nanoTime(); 584 LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), 585 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 586 587 // start the procedure scheduler 588 scheduler.start(); 589 590 // TODO: Split in two steps. 591 // TODO: Handle corrupted procedures (currently just a warn) 592 // The first one will make sure that we have the latest id, 593 // so we can start the threads and accept new procedures. 594 // The second step will do the actual load of old procedures. 595 st = System.nanoTime(); 596 load(abortOnCorruption); 597 et = System.nanoTime(); 598 LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), 599 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 600 } 601 602 /** 603 * Start the workers. 604 */ 605 public void startWorkers() throws IOException { 606 if (!running.compareAndSet(false, true)) { 607 LOG.warn("Already running"); 608 return; 609 } 610 // Start the executors. Here we must have the lastProcId set. 611 LOG.trace("Start workers {}", workerThreads.size()); 612 timeoutExecutor.start(); 613 workerMonitorExecutor.start(); 614 for (WorkerThread worker : workerThreads) { 615 worker.start(); 616 } 617 618 // Internal chores 619 workerMonitorExecutor.add(new WorkerMonitor()); 620 621 // Add completed cleaner chore 622 addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, 623 nonceKeysToProcIdsMap)); 624 } 625 626 public void stop() { 627 if (!running.getAndSet(false)) { 628 return; 629 } 630 631 LOG.info("Stopping"); 632 scheduler.stop(); 633 timeoutExecutor.sendStopSignal(); 634 workerMonitorExecutor.sendStopSignal(); 635 } 636 637 public void join() { 638 assert !isRunning() : "expected not running"; 639 640 // stop the timeout executor 641 timeoutExecutor.awaitTermination(); 642 // stop the work monitor executor 643 workerMonitorExecutor.awaitTermination(); 644 645 // stop the worker threads 646 for (WorkerThread worker : workerThreads) { 647 worker.awaitTermination(); 648 } 649 650 // Destroy the Thread Group for the executors 651 // TODO: Fix. #join is not place to destroy resources. 652 try { 653 threadGroup.destroy(); 654 } catch (IllegalThreadStateException e) { 655 LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", this.threadGroup, 656 e.getMessage()); 657 // This dumps list of threads on STDOUT. 658 this.threadGroup.list(); 659 } 660 661 // reset the in-memory state for testing 662 completed.clear(); 663 rollbackStack.clear(); 664 procedures.clear(); 665 nonceKeysToProcIdsMap.clear(); 666 scheduler.clear(); 667 lastProcId.set(-1); 668 } 669 670 public void refreshConfiguration(final Configuration conf) { 671 this.conf = conf; 672 setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, DEFAULT_WORKER_KEEP_ALIVE_TIME), 673 TimeUnit.MILLISECONDS); 674 } 675 676 // ========================================================================== 677 // Accessors 678 // ========================================================================== 679 public boolean isRunning() { 680 return running.get(); 681 } 682 683 /** 684 * @return the current number of worker threads. 685 */ 686 public int getWorkerThreadCount() { 687 return workerThreads.size(); 688 } 689 690 /** 691 * @return the core pool size settings. 692 */ 693 public int getCorePoolSize() { 694 return corePoolSize; 695 } 696 697 public int getActiveExecutorCount() { 698 return activeExecutorCount.get(); 699 } 700 701 public TEnvironment getEnvironment() { 702 return this.environment; 703 } 704 705 public ProcedureStore getStore() { 706 return this.store; 707 } 708 709 ProcedureScheduler getScheduler() { 710 return scheduler; 711 } 712 713 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { 714 this.keepAliveTime = timeUnit.toMillis(keepAliveTime); 715 this.scheduler.signalAll(); 716 } 717 718 public long getKeepAliveTime(final TimeUnit timeUnit) { 719 return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); 720 } 721 722 // ========================================================================== 723 // Submit/Remove Chores 724 // ========================================================================== 725 726 /** 727 * Add a chore procedure to the executor 728 * @param chore the chore to add 729 */ 730 public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) { 731 if (chore == null) { 732 return; 733 } 734 chore.setState(ProcedureState.WAITING_TIMEOUT); 735 timeoutExecutor.add(chore); 736 } 737 738 /** 739 * Remove a chore procedure from the executor 740 * @param chore the chore to remove 741 * @return whether the chore is removed, or it will be removed later 742 */ 743 public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) { 744 if (chore == null) { 745 return true; 746 } 747 chore.setState(ProcedureState.SUCCESS); 748 return timeoutExecutor.remove(chore); 749 } 750 751 // ========================================================================== 752 // Nonce Procedure helpers 753 // ========================================================================== 754 /** 755 * Create a NonceKey from the specified nonceGroup and nonce. 756 * @param nonceGroup the group to use for the {@link NonceKey} 757 * @param nonce the nonce to use in the {@link NonceKey} 758 * @return the generated NonceKey 759 */ 760 public NonceKey createNonceKey(final long nonceGroup, final long nonce) { 761 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); 762 } 763 764 /** 765 * Register a nonce for a procedure that is going to be submitted. A procId will be reserved and 766 * on submitProcedure(), the procedure with the specified nonce will take the reserved ProcId. If 767 * someone already reserved the nonce, this method will return the procId reserved, otherwise an 768 * invalid procId will be returned. and the caller should procede and submit the procedure. 769 * @param nonceKey A unique identifier for this operation from the client or process. 770 * @return the procId associated with the nonce, if any otherwise an invalid procId. 771 */ 772 public long registerNonce(final NonceKey nonceKey) { 773 if (nonceKey == null) { 774 return -1; 775 } 776 777 // check if we have already a Reserved ID for the nonce 778 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); 779 if (oldProcId == null) { 780 // reserve a new Procedure ID, this will be associated with the nonce 781 // and the procedure submitted with the specified nonce will use this ID. 782 final long newProcId = nextProcId(); 783 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); 784 if (oldProcId == null) { 785 return -1; 786 } 787 } 788 789 // we found a registered nonce, but the procedure may not have been submitted yet. 790 // since the client expect the procedure to be submitted, spin here until it is. 791 final boolean traceEnabled = LOG.isTraceEnabled(); 792 while ( 793 isRunning() && !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) 794 && nonceKeysToProcIdsMap.containsKey(nonceKey) 795 ) { 796 if (traceEnabled) { 797 LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted"); 798 } 799 Threads.sleep(100); 800 } 801 return oldProcId.longValue(); 802 } 803 804 /** 805 * Remove the NonceKey if the procedure was not submitted to the executor. 806 * @param nonceKey A unique identifier for this operation from the client or process. 807 */ 808 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { 809 if (nonceKey == null) { 810 return; 811 } 812 813 final Long procId = nonceKeysToProcIdsMap.get(nonceKey); 814 if (procId == null) { 815 return; 816 } 817 818 // if the procedure was not submitted, remove the nonce 819 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { 820 nonceKeysToProcIdsMap.remove(nonceKey); 821 } 822 } 823 824 /** 825 * If the failure failed before submitting it, we may want to give back the same error to the 826 * requests with the same nonceKey. 827 * @param nonceKey A unique identifier for this operation from the client or process 828 * @param procName name of the procedure, used to inform the user 829 * @param procOwner name of the owner of the procedure, used to inform the user 830 * @param exception the failure to report to the user 831 */ 832 public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, 833 IOException exception) { 834 if (nonceKey == null) { 835 return; 836 } 837 838 Long procId = nonceKeysToProcIdsMap.get(nonceKey); 839 if (procId == null || completed.containsKey(procId)) { 840 return; 841 } 842 843 completed.computeIfAbsent(procId, (key) -> { 844 Procedure<TEnvironment> proc = 845 new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); 846 847 return new CompletedProcedureRetainer<>(proc); 848 }); 849 } 850 851 // ========================================================================== 852 // Submit/Abort Procedure 853 // ========================================================================== 854 /** 855 * Add a new root-procedure to the executor. 856 * @param proc the new procedure to execute. 857 * @return the procedure id, that can be used to monitor the operation 858 */ 859 public long submitProcedure(Procedure<TEnvironment> proc) { 860 return submitProcedure(proc, null); 861 } 862 863 /** 864 * Bypass a procedure. If the procedure is set to bypass, all the logic in execute/rollback will 865 * be ignored and it will return success, whatever. It is used to recover buggy stuck procedures, 866 * releasing the lock resources and letting other procedures run. Bypassing one procedure (and its 867 * ancestors will be bypassed automatically) may leave the cluster in a middle state, e.g. region 868 * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, the 869 * operators may have to do some clean up on hdfs or schedule some assign procedures to let region 870 * online. DO AT YOUR OWN RISK. 871 * <p> 872 * A procedure can be bypassed only if 1. The procedure is in state of RUNNABLE, WAITING, 873 * WAITING_TIMEOUT or it is a root procedure without any child. 2. No other worker thread is 874 * executing it 3. No child procedure has been submitted 875 * <p> 876 * If all the requirements are meet, the procedure and its ancestors will be bypassed and 877 * persisted to WAL. 878 * <p> 879 * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. TODO: What 880 * about WAITING_TIMEOUT? 881 * @param pids the procedure id 882 * @param lockWait time to wait lock 883 * @param force if force set to true, we will bypass the procedure even if it is executing. 884 * This is for procedures which can't break out during executing(due to bug, 885 * mostly) In this case, bypassing the procedure is not enough, since it is 886 * already stuck there. We need to restart the master after bypassing, and 887 * letting the problematic procedure to execute wth bypass=true, so in that 888 * condition, the procedure can be successfully bypassed. 889 * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! 890 * @return true if bypass success 891 * @throws IOException IOException 892 */ 893 public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, 894 boolean recursive) throws IOException { 895 List<Boolean> result = new ArrayList<Boolean>(pids.size()); 896 for (long pid : pids) { 897 result.add(bypassProcedure(pid, lockWait, force, recursive)); 898 } 899 return result; 900 } 901 902 boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) 903 throws IOException { 904 Preconditions.checkArgument(lockWait > 0, "lockWait should be positive"); 905 final Procedure<TEnvironment> procedure = getProcedure(pid); 906 if (procedure == null) { 907 LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); 908 return false; 909 } 910 911 LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", procedure, lockWait, 912 override, recursive); 913 IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); 914 if (lockEntry == null && !override) { 915 LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", lockWait, 916 procedure, override); 917 return false; 918 } else if (lockEntry == null) { 919 LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", lockWait, 920 procedure, override); 921 } 922 try { 923 // check whether the procedure is already finished 924 if (procedure.isFinished()) { 925 LOG.debug("{} is already finished, skipping bypass", procedure); 926 return false; 927 } 928 929 if (procedure.hasChildren()) { 930 if (recursive) { 931 // EXPENSIVE. Checks each live procedure of which there could be many!!! 932 // Is there another way to get children of a procedure? 933 LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); 934 this.procedures.forEachValue(1 /* Single-threaded */, 935 // Transformer 936 v -> v.getParentProcId() == procedure.getProcId() ? v : null, 937 // Consumer 938 v -> { 939 try { 940 bypassProcedure(v.getProcId(), lockWait, override, recursive); 941 } catch (IOException e) { 942 LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); 943 } 944 }); 945 } else { 946 LOG.debug("{} has children, skipping bypass", procedure); 947 return false; 948 } 949 } 950 951 // If the procedure has no parent or no child, we are safe to bypass it in whatever state 952 if ( 953 procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE 954 && procedure.getState() != ProcedureState.WAITING 955 && procedure.getState() != ProcedureState.WAITING_TIMEOUT 956 ) { 957 LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " 958 + "(with no parent), {}", procedure); 959 // Question: how is the bypass done here? 960 return false; 961 } 962 963 // Now, the procedure is not finished, and no one can execute it since we take the lock now 964 // And we can be sure that its ancestor is not running too, since their child has not 965 // finished yet 966 Procedure<TEnvironment> current = procedure; 967 while (current != null) { 968 LOG.debug("Bypassing {}", current); 969 current.bypass(getEnvironment()); 970 store.update(current); 971 long parentID = current.getParentProcId(); 972 current = getProcedure(parentID); 973 } 974 975 // wake up waiting procedure, already checked there is no child 976 if (procedure.getState() == ProcedureState.WAITING) { 977 procedure.setState(ProcedureState.RUNNABLE); 978 store.update(procedure); 979 } 980 981 // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. 982 // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE 983 if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 984 LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); 985 if (timeoutExecutor.remove(procedure)) { 986 LOG.debug("removed procedure {} from timeoutExecutor", procedure); 987 timeoutExecutor.executeTimedoutProcedure(procedure); 988 } 989 } else if (lockEntry != null) { 990 scheduler.addFront(procedure); 991 LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); 992 } else { 993 // If we don't have the lock, we can't re-submit the queue, 994 // since it is already executing. To get rid of the stuck situation, we 995 // need to restart the master. With the procedure set to bypass, the procedureExecutor 996 // will bypass it and won't get stuck again. 997 LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " 998 + "skipping add to queue", procedure); 999 } 1000 return true; 1001 1002 } finally { 1003 if (lockEntry != null) { 1004 procExecutionLock.releaseLockEntry(lockEntry); 1005 } 1006 } 1007 } 1008 1009 /** 1010 * Add a new root-procedure to the executor. 1011 * @param proc the new procedure to execute. 1012 * @param nonceKey the registered unique identifier for this operation from the client or process. 1013 * @return the procedure id, that can be used to monitor the operation 1014 */ 1015 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", 1016 justification = "FindBugs is blind to the check-for-null") 1017 public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { 1018 Preconditions.checkArgument(lastProcId.get() >= 0); 1019 1020 prepareProcedure(proc); 1021 1022 final Long currentProcId; 1023 if (nonceKey != null) { 1024 currentProcId = nonceKeysToProcIdsMap.get(nonceKey); 1025 Preconditions.checkArgument(currentProcId != null, 1026 "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); 1027 } else { 1028 currentProcId = nextProcId(); 1029 } 1030 1031 // Initialize the procedure 1032 proc.setNonceKey(nonceKey); 1033 proc.setProcId(currentProcId.longValue()); 1034 1035 // Commit the transaction 1036 store.insert(proc, null); 1037 LOG.debug("Stored {}", proc); 1038 1039 // Add the procedure to the executor 1040 return pushProcedure(proc); 1041 } 1042 1043 /** 1044 * Add a set of new root-procedure to the executor. 1045 * @param procs the new procedures to execute. 1046 */ 1047 // TODO: Do we need to take nonces here? 1048 public void submitProcedures(Procedure<TEnvironment>[] procs) { 1049 Preconditions.checkArgument(lastProcId.get() >= 0); 1050 if (procs == null || procs.length <= 0) { 1051 return; 1052 } 1053 1054 // Prepare procedure 1055 for (int i = 0; i < procs.length; ++i) { 1056 prepareProcedure(procs[i]).setProcId(nextProcId()); 1057 } 1058 1059 // Commit the transaction 1060 store.insert(procs); 1061 if (LOG.isDebugEnabled()) { 1062 LOG.debug("Stored " + Arrays.toString(procs)); 1063 } 1064 1065 // Add the procedure to the executor 1066 for (int i = 0; i < procs.length; ++i) { 1067 pushProcedure(procs[i]); 1068 } 1069 } 1070 1071 private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { 1072 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); 1073 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); 1074 if (this.checkOwnerSet) { 1075 Preconditions.checkArgument(proc.hasOwner(), "missing owner"); 1076 } 1077 return proc; 1078 } 1079 1080 private long pushProcedure(Procedure<TEnvironment> proc) { 1081 final long currentProcId = proc.getProcId(); 1082 1083 // Update metrics on start of a procedure 1084 proc.updateMetricsOnSubmit(getEnvironment()); 1085 1086 // Create the rollback stack for the procedure 1087 RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); 1088 rollbackStack.put(currentProcId, stack); 1089 1090 // Submit the new subprocedures 1091 assert !procedures.containsKey(currentProcId); 1092 procedures.put(currentProcId, proc); 1093 sendProcedureAddedNotification(currentProcId); 1094 scheduler.addBack(proc); 1095 return proc.getProcId(); 1096 } 1097 1098 /** 1099 * Send an abort notification the specified procedure. Depending on the procedure implementation 1100 * the abort can be considered or ignored. 1101 * @param procId the procedure to abort 1102 * @return true if the procedure exists and has received the abort, otherwise false. 1103 */ 1104 public boolean abort(long procId) { 1105 return abort(procId, true); 1106 } 1107 1108 /** 1109 * Send an abort notification to the specified procedure. Depending on the procedure 1110 * implementation, the abort can be considered or ignored. 1111 * @param procId the procedure to abort 1112 * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? 1113 * @return true if the procedure exists and has received the abort, otherwise false. 1114 */ 1115 public boolean abort(long procId, boolean mayInterruptIfRunning) { 1116 Procedure<TEnvironment> proc = procedures.get(procId); 1117 if (proc != null) { 1118 if (!mayInterruptIfRunning && proc.wasExecuted()) { 1119 return false; 1120 } 1121 return proc.abort(getEnvironment()); 1122 } 1123 return false; 1124 } 1125 1126 // ========================================================================== 1127 // Executor query helpers 1128 // ========================================================================== 1129 public Procedure<TEnvironment> getProcedure(final long procId) { 1130 return procedures.get(procId); 1131 } 1132 1133 public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { 1134 Procedure<TEnvironment> proc = getProcedure(procId); 1135 if (clazz.isInstance(proc)) { 1136 return clazz.cast(proc); 1137 } 1138 return null; 1139 } 1140 1141 public Procedure<TEnvironment> getResult(long procId) { 1142 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1143 if (retainer == null) { 1144 return null; 1145 } else { 1146 return retainer.getProcedure(); 1147 } 1148 } 1149 1150 /** 1151 * Return true if the procedure is finished. The state may be "completed successfully" or "failed 1152 * and rolledback". Use getResult() to check the state or get the result data. 1153 * @param procId the ID of the procedure to check 1154 * @return true if the procedure execution is finished, otherwise false. 1155 */ 1156 public boolean isFinished(final long procId) { 1157 return !procedures.containsKey(procId); 1158 } 1159 1160 /** 1161 * Return true if the procedure is started. 1162 * @param procId the ID of the procedure to check 1163 * @return true if the procedure execution is started, otherwise false. 1164 */ 1165 public boolean isStarted(long procId) { 1166 Procedure<?> proc = procedures.get(procId); 1167 if (proc == null) { 1168 return completed.get(procId) != null; 1169 } 1170 return proc.wasExecuted(); 1171 } 1172 1173 /** 1174 * Mark the specified completed procedure, as ready to remove. 1175 * @param procId the ID of the procedure to remove 1176 */ 1177 public void removeResult(long procId) { 1178 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1179 if (retainer == null) { 1180 assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; 1181 LOG.debug("pid={} already removed by the cleaner.", procId); 1182 return; 1183 } 1184 1185 // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. 1186 retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); 1187 } 1188 1189 public Procedure<TEnvironment> getResultOrProcedure(long procId) { 1190 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1191 if (retainer == null) { 1192 return procedures.get(procId); 1193 } else { 1194 return retainer.getProcedure(); 1195 } 1196 } 1197 1198 /** 1199 * Check if the user is this procedure's owner 1200 * @param procId the target procedure 1201 * @param user the user 1202 * @return true if the user is the owner of the procedure, false otherwise or the owner is 1203 * unknown. 1204 */ 1205 public boolean isProcedureOwner(long procId, User user) { 1206 if (user == null) { 1207 return false; 1208 } 1209 final Procedure<TEnvironment> runningProc = procedures.get(procId); 1210 if (runningProc != null) { 1211 return runningProc.getOwner().equals(user.getShortName()); 1212 } 1213 1214 final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1215 if (retainer != null) { 1216 return retainer.getProcedure().getOwner().equals(user.getShortName()); 1217 } 1218 1219 // Procedure either does not exist or has already completed and got cleaned up. 1220 // At this time, we cannot check the owner of the procedure 1221 return false; 1222 } 1223 1224 /** 1225 * Should only be used when starting up, where the procedure workers have not been started. 1226 * <p/> 1227 * If the procedure works has been started, the return values maybe changed when you are 1228 * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as 1229 * it will do a copy, and also include the finished procedures. 1230 */ 1231 public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() { 1232 return procedures.values(); 1233 } 1234 1235 /** 1236 * Get procedures. 1237 * @return the procedures in a list 1238 */ 1239 public List<Procedure<TEnvironment>> getProcedures() { 1240 List<Procedure<TEnvironment>> procedureList = 1241 new ArrayList<>(procedures.size() + completed.size()); 1242 procedureList.addAll(procedures.values()); 1243 // Note: The procedure could show up twice in the list with different state, as 1244 // it could complete after we walk through procedures list and insert into 1245 // procedureList - it is ok, as we will use the information in the Procedure 1246 // to figure it out; to prevent this would increase the complexity of the logic. 1247 completed.values().stream().map(CompletedProcedureRetainer::getProcedure) 1248 .forEach(procedureList::add); 1249 return procedureList; 1250 } 1251 1252 // ========================================================================== 1253 // Listeners helpers 1254 // ========================================================================== 1255 public void registerListener(ProcedureExecutorListener listener) { 1256 this.listeners.add(listener); 1257 } 1258 1259 public boolean unregisterListener(ProcedureExecutorListener listener) { 1260 return this.listeners.remove(listener); 1261 } 1262 1263 private void sendProcedureLoadedNotification(final long procId) { 1264 if (!this.listeners.isEmpty()) { 1265 for (ProcedureExecutorListener listener : this.listeners) { 1266 try { 1267 listener.procedureLoaded(procId); 1268 } catch (Throwable e) { 1269 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1270 } 1271 } 1272 } 1273 } 1274 1275 private void sendProcedureAddedNotification(final long procId) { 1276 if (!this.listeners.isEmpty()) { 1277 for (ProcedureExecutorListener listener : this.listeners) { 1278 try { 1279 listener.procedureAdded(procId); 1280 } catch (Throwable e) { 1281 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1282 } 1283 } 1284 } 1285 } 1286 1287 private void sendProcedureFinishedNotification(final long procId) { 1288 if (!this.listeners.isEmpty()) { 1289 for (ProcedureExecutorListener listener : this.listeners) { 1290 try { 1291 listener.procedureFinished(procId); 1292 } catch (Throwable e) { 1293 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1294 } 1295 } 1296 } 1297 } 1298 1299 // ========================================================================== 1300 // Procedure IDs helpers 1301 // ========================================================================== 1302 private long nextProcId() { 1303 long procId = lastProcId.incrementAndGet(); 1304 if (procId < 0) { 1305 while (!lastProcId.compareAndSet(procId, 0)) { 1306 procId = lastProcId.get(); 1307 if (procId >= 0) { 1308 break; 1309 } 1310 } 1311 while (procedures.containsKey(procId)) { 1312 procId = lastProcId.incrementAndGet(); 1313 } 1314 } 1315 assert procId >= 0 : "Invalid procId " + procId; 1316 return procId; 1317 } 1318 1319 protected long getLastProcId() { 1320 return lastProcId.get(); 1321 } 1322 1323 public Set<Long> getActiveProcIds() { 1324 return procedures.keySet(); 1325 } 1326 1327 Long getRootProcedureId(Procedure<TEnvironment> proc) { 1328 return Procedure.getRootProcedureId(procedures, proc); 1329 } 1330 1331 // ========================================================================== 1332 // Executions 1333 // ========================================================================== 1334 private void executeProcedure(Procedure<TEnvironment> proc) { 1335 if (proc.isFinished()) { 1336 LOG.debug("{} is already finished, skipping execution", proc); 1337 return; 1338 } 1339 final Long rootProcId = getRootProcedureId(proc); 1340 if (rootProcId == null) { 1341 // The 'proc' was ready to run but the root procedure was rolledback 1342 LOG.warn("Rollback because parent is done/rolledback proc=" + proc); 1343 executeRollback(proc); 1344 return; 1345 } 1346 1347 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 1348 if (procStack == null) { 1349 LOG.warn("RootProcedureState is null for " + proc.getProcId()); 1350 return; 1351 } 1352 do { 1353 // Try to acquire the execution 1354 if (!procStack.acquire(proc)) { 1355 if (procStack.setRollback()) { 1356 // we have the 'rollback-lock' we can start rollingback 1357 switch (executeRollback(rootProcId, procStack)) { 1358 case LOCK_ACQUIRED: 1359 break; 1360 case LOCK_YIELD_WAIT: 1361 procStack.unsetRollback(); 1362 scheduler.yield(proc); 1363 break; 1364 case LOCK_EVENT_WAIT: 1365 LOG.info("LOCK_EVENT_WAIT rollback..." + proc); 1366 procStack.unsetRollback(); 1367 break; 1368 default: 1369 throw new UnsupportedOperationException(); 1370 } 1371 } else { 1372 // if we can't rollback means that some child is still running. 1373 // the rollback will be executed after all the children are done. 1374 // If the procedure was never executed, remove and mark it as rolledback. 1375 if (!proc.wasExecuted()) { 1376 switch (executeRollback(proc)) { 1377 case LOCK_ACQUIRED: 1378 break; 1379 case LOCK_YIELD_WAIT: 1380 scheduler.yield(proc); 1381 break; 1382 case LOCK_EVENT_WAIT: 1383 LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); 1384 break; 1385 default: 1386 throw new UnsupportedOperationException(); 1387 } 1388 } 1389 } 1390 break; 1391 } 1392 1393 // Execute the procedure 1394 assert proc.getState() == ProcedureState.RUNNABLE : proc; 1395 // Note that lock is NOT about concurrency but rather about ensuring 1396 // ownership of a procedure of an entity such as a region or table 1397 LockState lockState = acquireLock(proc); 1398 switch (lockState) { 1399 case LOCK_ACQUIRED: 1400 execProcedure(procStack, proc); 1401 break; 1402 case LOCK_YIELD_WAIT: 1403 LOG.info(lockState + " " + proc); 1404 scheduler.yield(proc); 1405 break; 1406 case LOCK_EVENT_WAIT: 1407 // Someone will wake us up when the lock is available 1408 LOG.debug(lockState + " " + proc); 1409 break; 1410 default: 1411 throw new UnsupportedOperationException(); 1412 } 1413 procStack.release(proc); 1414 1415 if (proc.isSuccess()) { 1416 // update metrics on finishing the procedure 1417 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); 1418 LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); 1419 // Finalize the procedure state 1420 if (proc.getProcId() == rootProcId) { 1421 procedureFinished(proc); 1422 } else { 1423 execCompletionCleanup(proc); 1424 } 1425 break; 1426 } 1427 } while (procStack.isFailed()); 1428 } 1429 1430 private LockState acquireLock(Procedure<TEnvironment> proc) { 1431 TEnvironment env = getEnvironment(); 1432 // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if 1433 // hasLock is true. 1434 if (proc.hasLock()) { 1435 return LockState.LOCK_ACQUIRED; 1436 } 1437 return proc.doAcquireLock(env, store); 1438 } 1439 1440 private void releaseLock(Procedure<TEnvironment> proc, boolean force) { 1441 TEnvironment env = getEnvironment(); 1442 // For how the framework works, we know that we will always have the lock 1443 // when we call releaseLock(), so we can avoid calling proc.hasLock() 1444 if (force || !proc.holdLock(env) || proc.isFinished()) { 1445 proc.doReleaseLock(env, store); 1446 } 1447 } 1448 1449 /** 1450 * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the 1451 * root-procedure will be visible as finished to user, and the result will be the fatal exception. 1452 */ 1453 private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { 1454 Procedure<TEnvironment> rootProc = procedures.get(rootProcId); 1455 RemoteProcedureException exception = rootProc.getException(); 1456 // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are 1457 // rolling back because the subprocedure does. Clarify. 1458 if (exception == null) { 1459 exception = procStack.getException(); 1460 rootProc.setFailure(exception); 1461 store.update(rootProc); 1462 } 1463 1464 List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); 1465 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; 1466 1467 int stackTail = subprocStack.size(); 1468 while (stackTail-- > 0) { 1469 Procedure<TEnvironment> proc = subprocStack.get(stackTail); 1470 IdLock.Entry lockEntry = null; 1471 // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need 1472 // this check, as the worker will hold the lock before executing a procedure. This is the only 1473 // place where we may hold two procedure execution locks, and there is a fence in the 1474 // RootProcedureState where we can make sure that only one worker can execute the rollback of 1475 // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to 1476 // prevent race between us and the force update thread. 1477 if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) { 1478 try { 1479 lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1480 } catch (IOException e) { 1481 // can only happen if interrupted, so not a big deal to propagate it 1482 throw new UncheckedIOException(e); 1483 } 1484 } 1485 try { 1486 // For the sub procedures which are successfully finished, we do not rollback them. 1487 // Typically, if we want to rollback a procedure, we first need to rollback it, and then 1488 // recursively rollback its ancestors. The state changes which are done by sub procedures 1489 // should be handled by parent procedures when rolling back. For example, when rolling back 1490 // a MergeTableProcedure, we will schedule new procedures to bring the offline regions 1491 // online, instead of rolling back the original procedures which offlined the regions(in 1492 // fact these procedures can not be rolled back...). 1493 if (proc.isSuccess()) { 1494 // Just do the cleanup work, without actually executing the rollback 1495 subprocStack.remove(stackTail); 1496 cleanupAfterRollbackOneStep(proc); 1497 continue; 1498 } 1499 LockState lockState = acquireLock(proc); 1500 if (lockState != LockState.LOCK_ACQUIRED) { 1501 // can't take a lock on the procedure, add the root-proc back on the 1502 // queue waiting for the lock availability 1503 return lockState; 1504 } 1505 1506 lockState = executeRollback(proc); 1507 releaseLock(proc, false); 1508 boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; 1509 abortRollback |= !isRunning() || !store.isRunning(); 1510 1511 // allows to kill the executor before something is stored to the wal. 1512 // useful to test the procedure recovery. 1513 if (abortRollback) { 1514 return lockState; 1515 } 1516 1517 subprocStack.remove(stackTail); 1518 1519 // if the procedure is kind enough to pass the slot to someone else, yield 1520 // if the proc is already finished, do not yield 1521 if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { 1522 return LockState.LOCK_YIELD_WAIT; 1523 } 1524 1525 if (proc != rootProc) { 1526 execCompletionCleanup(proc); 1527 } 1528 } finally { 1529 if (lockEntry != null) { 1530 procExecutionLock.releaseLockEntry(lockEntry); 1531 } 1532 } 1533 } 1534 1535 // Finalize the procedure state 1536 LOG.info("Rolled back {} exec-time={}", rootProc, 1537 StringUtils.humanTimeDiff(rootProc.elapsedTime())); 1538 procedureFinished(rootProc); 1539 return LockState.LOCK_ACQUIRED; 1540 } 1541 1542 private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { 1543 if (proc.removeStackIndex()) { 1544 if (!proc.isSuccess()) { 1545 proc.setState(ProcedureState.ROLLEDBACK); 1546 } 1547 1548 // update metrics on finishing the procedure (fail) 1549 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); 1550 1551 if (proc.hasParent()) { 1552 store.delete(proc.getProcId()); 1553 procedures.remove(proc.getProcId()); 1554 } else { 1555 final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); 1556 if (childProcIds != null) { 1557 store.delete(proc, childProcIds); 1558 } else { 1559 store.update(proc); 1560 } 1561 } 1562 } else { 1563 store.update(proc); 1564 } 1565 } 1566 1567 /** 1568 * Execute the rollback of the procedure step. It updates the store with the new state (stack 1569 * index) or will remove completly the procedure in case it is a child. 1570 */ 1571 private LockState executeRollback(Procedure<TEnvironment> proc) { 1572 try { 1573 proc.doRollback(getEnvironment()); 1574 } catch (IOException e) { 1575 LOG.debug("Roll back attempt failed for {}", proc, e); 1576 return LockState.LOCK_YIELD_WAIT; 1577 } catch (InterruptedException e) { 1578 handleInterruptedException(proc, e); 1579 return LockState.LOCK_YIELD_WAIT; 1580 } catch (Throwable e) { 1581 // Catch NullPointerExceptions or similar errors... 1582 LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); 1583 } 1584 1585 // allows to kill the executor before something is stored to the wal. 1586 // useful to test the procedure recovery. 1587 if (testing != null && testing.shouldKillBeforeStoreUpdate()) { 1588 String msg = "TESTING: Kill before store update"; 1589 LOG.debug(msg); 1590 stop(); 1591 throw new RuntimeException(msg); 1592 } 1593 1594 cleanupAfterRollbackOneStep(proc); 1595 1596 return LockState.LOCK_ACQUIRED; 1597 } 1598 1599 private void yieldProcedure(Procedure<TEnvironment> proc) { 1600 releaseLock(proc, false); 1601 scheduler.yield(proc); 1602 } 1603 1604 /** 1605 * Executes <code>procedure</code> 1606 * <ul> 1607 * <li>Calls the doExecute() of the procedure 1608 * <li>If the procedure execution didn't fail (i.e. valid user input) 1609 * <ul> 1610 * <li>...and returned subprocedures 1611 * <ul> 1612 * <li>The subprocedures are initialized. 1613 * <li>The subprocedures are added to the store 1614 * <li>The subprocedures are added to the runnable queue 1615 * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete 1616 * </ul> 1617 * </li> 1618 * <li>...if there are no subprocedure 1619 * <ul> 1620 * <li>the procedure completed successfully 1621 * <li>if there is a parent (WAITING) 1622 * <li>the parent state will be set to RUNNABLE 1623 * </ul> 1624 * </li> 1625 * </ul> 1626 * </li> 1627 * <li>In case of failure 1628 * <ul> 1629 * <li>The store is updated with the new state</li> 1630 * <li>The executor (caller of this method) will start the rollback of the procedure</li> 1631 * </ul> 1632 * </li> 1633 * </ul> 1634 */ 1635 private void execProcedure(RootProcedureState<TEnvironment> procStack, 1636 Procedure<TEnvironment> procedure) { 1637 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 1638 "NOT RUNNABLE! " + procedure.toString()); 1639 1640 // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. 1641 // The exception is caught below and then we hurry to the exit without disturbing state. The 1642 // idea is that the processing of this procedure will be unsuspended later by an external event 1643 // such the report of a region open. 1644 boolean suspended = false; 1645 1646 // Whether to 're-' -execute; run through the loop again. 1647 boolean reExecute = false; 1648 1649 Procedure<TEnvironment>[] subprocs = null; 1650 do { 1651 reExecute = false; 1652 procedure.resetPersistence(); 1653 try { 1654 subprocs = procedure.doExecute(getEnvironment()); 1655 if (subprocs != null && subprocs.length == 0) { 1656 subprocs = null; 1657 } 1658 } catch (ProcedureSuspendedException e) { 1659 LOG.trace("Suspend {}", procedure); 1660 suspended = true; 1661 } catch (ProcedureYieldException e) { 1662 LOG.trace("Yield {}", procedure, e); 1663 yieldProcedure(procedure); 1664 return; 1665 } catch (InterruptedException e) { 1666 LOG.trace("Yield interrupt {}", procedure, e); 1667 handleInterruptedException(procedure, e); 1668 yieldProcedure(procedure); 1669 return; 1670 } catch (Throwable e) { 1671 // Catch NullPointerExceptions or similar errors... 1672 String msg = "CODE-BUG: Uncaught runtime exception: " + procedure; 1673 LOG.error(msg, e); 1674 procedure.setFailure(new RemoteProcedureException(msg, e)); 1675 } 1676 1677 if (!procedure.isFailed()) { 1678 if (subprocs != null) { 1679 if (subprocs.length == 1 && subprocs[0] == procedure) { 1680 // Procedure returned itself. Quick-shortcut for a state machine-like procedure; 1681 // i.e. we go around this loop again rather than go back out on the scheduler queue. 1682 subprocs = null; 1683 reExecute = true; 1684 LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); 1685 } else { 1686 // Yield the current procedure, and make the subprocedure runnable 1687 // subprocs may come back 'null'. 1688 subprocs = initializeChildren(procStack, procedure, subprocs); 1689 LOG.info("Initialized subprocedures=" + (subprocs == null 1690 ? null 1691 : Stream.of(subprocs).map(e -> "{" + e.toString() + "}").collect(Collectors.toList()) 1692 .toString())); 1693 } 1694 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1695 LOG.trace("Added to timeoutExecutor {}", procedure); 1696 timeoutExecutor.add(procedure); 1697 } else if (!suspended) { 1698 // No subtask, so we are done 1699 procedure.setState(ProcedureState.SUCCESS); 1700 } 1701 } 1702 1703 // Add the procedure to the stack 1704 procStack.addRollbackStep(procedure); 1705 1706 // allows to kill the executor before something is stored to the wal. 1707 // useful to test the procedure recovery. 1708 if ( 1709 testing != null && testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent()) 1710 ) { 1711 kill("TESTING: Kill BEFORE store update: " + procedure); 1712 } 1713 1714 // TODO: The code here doesn't check if store is running before persisting to the store as 1715 // it relies on the method call below to throw RuntimeException to wind up the stack and 1716 // executor thread to stop. The statement following the method call below seems to check if 1717 // store is not running, to prevent scheduling children procedures, re-execution or yield 1718 // of this procedure. This may need more scrutiny and subsequent cleanup in future 1719 // 1720 // Commit the transaction even if a suspend (state may have changed). Note this append 1721 // can take a bunch of time to complete. 1722 if (procedure.needPersistence()) { 1723 updateStoreOnExec(procStack, procedure, subprocs); 1724 } 1725 1726 // if the store is not running we are aborting 1727 if (!store.isRunning()) { 1728 return; 1729 } 1730 // if the procedure is kind enough to pass the slot to someone else, yield 1731 if ( 1732 procedure.isRunnable() && !suspended 1733 && procedure.isYieldAfterExecutionStep(getEnvironment()) 1734 ) { 1735 yieldProcedure(procedure); 1736 return; 1737 } 1738 1739 assert (reExecute && subprocs == null) || !reExecute; 1740 } while (reExecute); 1741 1742 // Allows to kill the executor after something is stored to the WAL but before the below 1743 // state settings are done -- in particular the one on the end where we make parent 1744 // RUNNABLE again when its children are done; see countDownChildren. 1745 if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) { 1746 kill("TESTING: Kill AFTER store update: " + procedure); 1747 } 1748 1749 // Submit the new subprocedures 1750 if (subprocs != null && !procedure.isFailed()) { 1751 submitChildrenProcedures(subprocs); 1752 } 1753 1754 // we need to log the release lock operation before waking up the parent procedure, as there 1755 // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all 1756 // the sub procedures from store and cause problems... 1757 releaseLock(procedure, false); 1758 1759 // if the procedure is complete and has a parent, count down the children latch. 1760 // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. 1761 if (!suspended && procedure.isFinished() && procedure.hasParent()) { 1762 countDownChildren(procStack, procedure); 1763 } 1764 } 1765 1766 private void kill(String msg) { 1767 LOG.debug(msg); 1768 stop(); 1769 throw new RuntimeException(msg); 1770 } 1771 1772 private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, 1773 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1774 assert subprocs != null : "expected subprocedures"; 1775 final long rootProcId = getRootProcedureId(procedure); 1776 for (int i = 0; i < subprocs.length; ++i) { 1777 Procedure<TEnvironment> subproc = subprocs[i]; 1778 if (subproc == null) { 1779 String msg = "subproc[" + i + "] is null, aborting the procedure"; 1780 procedure 1781 .setFailure(new RemoteProcedureException(msg, new IllegalArgumentIOException(msg))); 1782 return null; 1783 } 1784 1785 assert subproc.getState() == ProcedureState.INITIALIZING : subproc; 1786 subproc.setParentProcId(procedure.getProcId()); 1787 subproc.setRootProcId(rootProcId); 1788 subproc.setProcId(nextProcId()); 1789 procStack.addSubProcedure(subproc); 1790 } 1791 1792 if (!procedure.isFailed()) { 1793 procedure.setChildrenLatch(subprocs.length); 1794 switch (procedure.getState()) { 1795 case RUNNABLE: 1796 procedure.setState(ProcedureState.WAITING); 1797 break; 1798 case WAITING_TIMEOUT: 1799 timeoutExecutor.add(procedure); 1800 break; 1801 default: 1802 break; 1803 } 1804 } 1805 return subprocs; 1806 } 1807 1808 private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { 1809 for (int i = 0; i < subprocs.length; ++i) { 1810 Procedure<TEnvironment> subproc = subprocs[i]; 1811 subproc.updateMetricsOnSubmit(getEnvironment()); 1812 assert !procedures.containsKey(subproc.getProcId()); 1813 procedures.put(subproc.getProcId(), subproc); 1814 scheduler.addFront(subproc); 1815 } 1816 } 1817 1818 private void countDownChildren(RootProcedureState<TEnvironment> procStack, 1819 Procedure<TEnvironment> procedure) { 1820 Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); 1821 if (parent == null) { 1822 assert procStack.isRollingback(); 1823 return; 1824 } 1825 1826 // If this procedure is the last child awake the parent procedure 1827 if (parent.tryRunnable()) { 1828 // If we succeeded in making the parent runnable -- i.e. all of its 1829 // children have completed, move parent to front of the queue. 1830 store.update(parent); 1831 scheduler.addFront(parent); 1832 LOG.info("Finished subprocedure pid={}, resume processing ppid={}", procedure.getProcId(), 1833 parent.getProcId()); 1834 return; 1835 } 1836 } 1837 1838 private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, 1839 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1840 if (subprocs != null && !procedure.isFailed()) { 1841 if (LOG.isTraceEnabled()) { 1842 LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); 1843 } 1844 store.insert(procedure, subprocs); 1845 } else { 1846 LOG.trace("Store update {}", procedure); 1847 if (procedure.isFinished() && !procedure.hasParent()) { 1848 // remove child procedures 1849 final long[] childProcIds = procStack.getSubprocedureIds(); 1850 if (childProcIds != null) { 1851 store.delete(procedure, childProcIds); 1852 for (int i = 0; i < childProcIds.length; ++i) { 1853 procedures.remove(childProcIds[i]); 1854 } 1855 } else { 1856 store.update(procedure); 1857 } 1858 } else { 1859 store.update(procedure); 1860 } 1861 } 1862 } 1863 1864 private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { 1865 LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); 1866 // NOTE: We don't call Thread.currentThread().interrupt() 1867 // because otherwise all the subsequent calls e.g. Thread.sleep() will throw 1868 // the InterruptedException. If the master is going down, we will be notified 1869 // and the executor/store will be stopped. 1870 // (The interrupted procedure will be retried on the next run) 1871 } 1872 1873 private void execCompletionCleanup(Procedure<TEnvironment> proc) { 1874 final TEnvironment env = getEnvironment(); 1875 if (proc.hasLock()) { 1876 LOG.warn("Usually this should not happen, we will release the lock before if the procedure" 1877 + " is finished, even if the holdLock is true, arrive here means we have some holes where" 1878 + " we do not release the lock. And the releaseLock below may fail since the procedure may" 1879 + " have already been deleted from the procedure store."); 1880 releaseLock(proc, true); 1881 } 1882 try { 1883 proc.completionCleanup(env); 1884 } catch (Throwable e) { 1885 // Catch NullPointerExceptions or similar errors... 1886 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); 1887 } 1888 } 1889 1890 private void procedureFinished(Procedure<TEnvironment> proc) { 1891 // call the procedure completion cleanup handler 1892 execCompletionCleanup(proc); 1893 1894 CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); 1895 1896 // update the executor internal state maps 1897 if (!proc.shouldWaitClientAck(getEnvironment())) { 1898 retainer.setClientAckTime(0); 1899 } 1900 1901 completed.put(proc.getProcId(), retainer); 1902 rollbackStack.remove(proc.getProcId()); 1903 procedures.remove(proc.getProcId()); 1904 1905 // call the runnableSet completion cleanup handler 1906 try { 1907 scheduler.completionCleanup(proc); 1908 } catch (Throwable e) { 1909 // Catch NullPointerExceptions or similar errors... 1910 LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); 1911 } 1912 1913 // Notify the listeners 1914 sendProcedureFinishedNotification(proc.getProcId()); 1915 } 1916 1917 RootProcedureState<TEnvironment> getProcStack(long rootProcId) { 1918 return rollbackStack.get(rootProcId); 1919 } 1920 1921 ProcedureScheduler getProcedureScheduler() { 1922 return scheduler; 1923 } 1924 1925 int getCompletedSize() { 1926 return completed.size(); 1927 } 1928 1929 public IdLock getProcExecutionLock() { 1930 return procExecutionLock; 1931 } 1932 1933 // ========================================================================== 1934 // Worker Thread 1935 // ========================================================================== 1936 private class WorkerThread extends StoppableThread { 1937 private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); 1938 private volatile Procedure<TEnvironment> activeProcedure; 1939 1940 public WorkerThread(ThreadGroup group) { 1941 this(group, "PEWorker-"); 1942 } 1943 1944 protected WorkerThread(ThreadGroup group, String prefix) { 1945 super(group, prefix + workerId.incrementAndGet()); 1946 setDaemon(true); 1947 } 1948 1949 @Override 1950 public void sendStopSignal() { 1951 scheduler.signalAll(); 1952 } 1953 1954 /** 1955 * Encapsulates execution of the current {@link #activeProcedure} for easy tracing. 1956 */ 1957 private long runProcedure() throws IOException { 1958 final Procedure<TEnvironment> proc = this.activeProcedure; 1959 int activeCount = activeExecutorCount.incrementAndGet(); 1960 int runningCount = store.setRunningProcedureCount(activeCount); 1961 LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, 1962 activeCount); 1963 executionStartTime.set(EnvironmentEdgeManager.currentTime()); 1964 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1965 try { 1966 executeProcedure(proc); 1967 } catch (AssertionError e) { 1968 LOG.info("ASSERT pid=" + proc.getProcId(), e); 1969 throw e; 1970 } finally { 1971 procExecutionLock.releaseLockEntry(lockEntry); 1972 activeCount = activeExecutorCount.decrementAndGet(); 1973 runningCount = store.setRunningProcedureCount(activeCount); 1974 LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, 1975 activeCount); 1976 this.activeProcedure = null; 1977 executionStartTime.set(Long.MAX_VALUE); 1978 } 1979 return EnvironmentEdgeManager.currentTime(); 1980 } 1981 1982 @Override 1983 public void run() { 1984 long lastUpdate = EnvironmentEdgeManager.currentTime(); 1985 try { 1986 while (isRunning() && keepAlive(lastUpdate)) { 1987 @SuppressWarnings("unchecked") 1988 Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); 1989 if (proc == null) { 1990 continue; 1991 } 1992 this.activeProcedure = proc; 1993 lastUpdate = TraceUtil.trace(this::runProcedure, new ProcedureSpanBuilder(proc)); 1994 } 1995 } catch (Throwable t) { 1996 LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); 1997 } finally { 1998 LOG.trace("Worker terminated."); 1999 } 2000 workerThreads.remove(this); 2001 } 2002 2003 @Override 2004 public String toString() { 2005 Procedure<?> p = this.activeProcedure; 2006 return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID : p.getProcId() + ")"); 2007 } 2008 2009 /** 2010 * @return the time since the current procedure is running 2011 */ 2012 public long getCurrentRunTime() { 2013 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); 2014 } 2015 2016 // core worker never timeout 2017 protected boolean keepAlive(long lastUpdate) { 2018 return true; 2019 } 2020 } 2021 2022 // A worker thread which can be added when core workers are stuck. Will timeout after 2023 // keepAliveTime if there is no procedure to run. 2024 private final class KeepAliveWorkerThread extends WorkerThread { 2025 public KeepAliveWorkerThread(ThreadGroup group) { 2026 super(group, "KeepAlivePEWorker-"); 2027 } 2028 2029 @Override 2030 protected boolean keepAlive(long lastUpdate) { 2031 return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; 2032 } 2033 } 2034 2035 // ---------------------------------------------------------------------------- 2036 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the 2037 // full set of procedures pending and completed to write a compacted 2038 // version of the log (in case is a log)? 2039 // In theory no, procedures are have a short life, so at some point the store 2040 // will have the tracker saying everything is in the last log. 2041 // ---------------------------------------------------------------------------- 2042 2043 private final class WorkerMonitor extends InlineChore { 2044 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = 2045 "hbase.procedure.worker.monitor.interval.msec"; 2046 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec 2047 2048 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = 2049 "hbase.procedure.worker.stuck.threshold.msec"; 2050 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec 2051 2052 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = 2053 "hbase.procedure.worker.add.stuck.percentage"; 2054 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck 2055 2056 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; 2057 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; 2058 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; 2059 2060 public WorkerMonitor() { 2061 refreshConfig(); 2062 } 2063 2064 @Override 2065 public void run() { 2066 final int stuckCount = checkForStuckWorkers(); 2067 checkThreadCount(stuckCount); 2068 2069 // refresh interval (poor man dynamic conf update) 2070 refreshConfig(); 2071 } 2072 2073 private int checkForStuckWorkers() { 2074 // check if any of the worker is stuck 2075 int stuckCount = 0; 2076 for (WorkerThread worker : workerThreads) { 2077 if (worker.getCurrentRunTime() < stuckThreshold) { 2078 continue; 2079 } 2080 2081 // WARN the worker is stuck 2082 stuckCount++; 2083 LOG.warn("Worker stuck {}, run time {}", worker, 2084 StringUtils.humanTimeDiff(worker.getCurrentRunTime())); 2085 } 2086 return stuckCount; 2087 } 2088 2089 private void checkThreadCount(final int stuckCount) { 2090 // nothing to do if there are no runnable tasks 2091 if (stuckCount < 1 || !scheduler.hasRunnables()) { 2092 return; 2093 } 2094 2095 // add a new thread if the worker stuck percentage exceed the threshold limit 2096 // and every handler is active. 2097 final float stuckPerc = ((float) stuckCount) / workerThreads.size(); 2098 // let's add new worker thread more aggressively, as they will timeout finally if there is no 2099 // work to do. 2100 if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { 2101 final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); 2102 workerThreads.add(worker); 2103 worker.start(); 2104 LOG.debug("Added new worker thread {}", worker); 2105 } 2106 } 2107 2108 private void refreshConfig() { 2109 addWorkerStuckPercentage = 2110 conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); 2111 timeoutInterval = 2112 conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, DEFAULT_WORKER_MONITOR_INTERVAL); 2113 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, DEFAULT_WORKER_STUCK_THRESHOLD); 2114 } 2115 2116 @Override 2117 public int getTimeoutInterval() { 2118 return timeoutInterval; 2119 } 2120 } 2121}