001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.UncheckedIOException; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Deque; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.Executor; 033import java.util.concurrent.Executors; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 043import org.apache.hadoop.hbase.log.HBaseMarkers; 044import org.apache.hadoop.hbase.procedure2.Procedure.LockState; 045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 048import org.apache.hadoop.hbase.procedure2.util.StringUtils; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.IdLock; 052import org.apache.hadoop.hbase.util.NonceKey; 053import org.apache.hadoop.hbase.util.Threads; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 063 064/** 065 * Thread Pool that executes the submitted procedures. 066 * The executor has a ProcedureStore associated. 067 * Each operation is logged and on restart the pending procedures are resumed. 068 * 069 * Unless the Procedure code throws an error (e.g. invalid user input) 070 * the procedure will complete (at some point in time), On restart the pending 071 * procedures are resumed and the once failed will be rolledback. 072 * 073 * The user can add procedures to the executor via submitProcedure(proc) 074 * check for the finished state via isFinished(procId) 075 * and get the result via getResult(procId) 076 */ 077@InterfaceAudience.Private 078public class ProcedureExecutor<TEnvironment> { 079 private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); 080 081 public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; 082 private static final boolean DEFAULT_CHECK_OWNER_SET = false; 083 084 public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = 085 "hbase.procedure.worker.keep.alive.time.msec"; 086 private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); 087 088 public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; 089 static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min 090 091 public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl"; 092 static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min 093 094 /** 095 * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to 096 * break PE having it fail at various junctures. When non-null, testing is set to an instance of 097 * the below internal {@link Testing} class with flags set for the particular test. 098 */ 099 volatile Testing testing = null; 100 101 /** 102 * Class with parameters describing how to fail/die when in testing-context. 103 */ 104 public static class Testing { 105 protected volatile boolean killIfHasParent = true; 106 protected volatile boolean killIfSuspended = false; 107 108 /** 109 * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is 110 * persisting all the state it needs to recover after a crash. 111 */ 112 protected volatile boolean killBeforeStoreUpdate = false; 113 protected volatile boolean toggleKillBeforeStoreUpdate = false; 114 115 /** 116 * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978 117 * is about a case where memory-state was being set after store to WAL where a crash could 118 * cause us to get stuck. This flag allows killing at what was a vulnerable time. 119 */ 120 protected volatile boolean killAfterStoreUpdate = false; 121 protected volatile boolean toggleKillAfterStoreUpdate = false; 122 123 protected boolean shouldKillBeforeStoreUpdate() { 124 final boolean kill = this.killBeforeStoreUpdate; 125 if (this.toggleKillBeforeStoreUpdate) { 126 this.killBeforeStoreUpdate = !kill; 127 LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); 128 } 129 return kill; 130 } 131 132 protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) { 133 if (isSuspended && !killIfSuspended) { 134 return false; 135 } 136 if (hasParent && !killIfHasParent) { 137 return false; 138 } 139 return shouldKillBeforeStoreUpdate(); 140 } 141 142 protected boolean shouldKillAfterStoreUpdate() { 143 final boolean kill = this.killAfterStoreUpdate; 144 if (this.toggleKillAfterStoreUpdate) { 145 this.killAfterStoreUpdate = !kill; 146 LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate); 147 } 148 return kill; 149 } 150 151 protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { 152 return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); 153 } 154 } 155 156 public interface ProcedureExecutorListener { 157 void procedureLoaded(long procId); 158 void procedureAdded(long procId); 159 void procedureFinished(long procId); 160 } 161 162 /** 163 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. 164 * Once a Root-Procedure completes (success or failure), the result will be added to this map. 165 * The user of ProcedureExecutor should call getResult(procId) to get the result. 166 */ 167 private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = 168 new ConcurrentHashMap<>(); 169 170 /** 171 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. 172 * The RootProcedureState contains the execution stack of the Root-Procedure, 173 * It is added to the map by submitProcedure() and removed on procedure completion. 174 */ 175 private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = 176 new ConcurrentHashMap<>(); 177 178 /** 179 * Helper map to lookup the live procedures by ID. 180 * This map contains every procedure. root-procedures and subprocedures. 181 */ 182 private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = 183 new ConcurrentHashMap<>(); 184 185 /** 186 * Helper map to lookup whether the procedure already issued from the same client. This map 187 * contains every root procedure. 188 */ 189 private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); 190 191 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = 192 new CopyOnWriteArrayList<>(); 193 194 private Configuration conf; 195 196 /** 197 * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing 198 * resource handling rather than observing in a #join is unexpected). 199 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 200 * (Should be ok). 201 */ 202 private ThreadGroup threadGroup; 203 204 /** 205 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 206 * resource handling rather than observing in a #join is unexpected). 207 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 208 * (Should be ok). 209 */ 210 private CopyOnWriteArrayList<WorkerThread> workerThreads; 211 212 /** 213 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 214 * resource handling rather than observing in a #join is unexpected). 215 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 216 * (Should be ok). 217 */ 218 private TimeoutExecutorThread<TEnvironment> timeoutExecutor; 219 220 private int corePoolSize; 221 private int maxPoolSize; 222 223 private volatile long keepAliveTime; 224 225 /** 226 * Scheduler/Queue that contains runnable procedures. 227 */ 228 private final ProcedureScheduler scheduler; 229 230 private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( 231 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); 232 233 private final AtomicLong lastProcId = new AtomicLong(-1); 234 private final AtomicLong workerId = new AtomicLong(0); 235 private final AtomicInteger activeExecutorCount = new AtomicInteger(0); 236 private final AtomicBoolean running = new AtomicBoolean(false); 237 private final TEnvironment environment; 238 private final ProcedureStore store; 239 240 private final boolean checkOwnerSet; 241 242 // To prevent concurrent execution of the same procedure. 243 // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the 244 // procedure is woken up before we finish the suspend which causes the same procedures to be 245 // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also 246 // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent 247 // execution of the same procedure. 248 private final IdLock procExecutionLock = new IdLock(); 249 250 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 251 final ProcedureStore store) { 252 this(conf, environment, store, new SimpleProcedureScheduler()); 253 } 254 255 private boolean isRootFinished(Procedure<?> proc) { 256 Procedure<?> rootProc = procedures.get(proc.getRootProcId()); 257 return rootProc == null || rootProc.isFinished(); 258 } 259 260 private void forceUpdateProcedure(long procId) throws IOException { 261 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); 262 try { 263 Procedure<TEnvironment> proc = procedures.get(procId); 264 if (proc != null) { 265 if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) { 266 LOG.debug("Procedure {} has already been finished and parent is succeeded," + 267 " skip force updating", proc); 268 return; 269 } 270 } else { 271 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 272 if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) { 273 LOG.debug("No pending procedure with id = {}, skip force updating.", procId); 274 return; 275 } 276 long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); 277 long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); 278 if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) { 279 LOG.debug("Procedure {} has already been finished and expired, skip force updating", 280 procId); 281 return; 282 } 283 proc = retainer.getProcedure(); 284 } 285 LOG.debug("Force update procedure {}", proc); 286 store.update(proc); 287 } finally { 288 procExecutionLock.releaseLockEntry(lockEntry); 289 } 290 } 291 292 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 293 final ProcedureStore store, final ProcedureScheduler scheduler) { 294 this.environment = environment; 295 this.scheduler = scheduler; 296 this.store = store; 297 this.conf = conf; 298 this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); 299 refreshConfiguration(conf); 300 store.registerListener(new ProcedureStoreListener() { 301 302 @Override 303 public void forceUpdate(long[] procIds) { 304 Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { 305 try { 306 forceUpdateProcedure(procId); 307 } catch (IOException e) { 308 LOG.warn("Failed to force update procedure with pid={}", procId); 309 } 310 })); 311 } 312 }); 313 } 314 315 private void load(final boolean abortOnCorruption) throws IOException { 316 Preconditions.checkArgument(completed.isEmpty(), "completed not empty"); 317 Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty"); 318 Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty"); 319 Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty"); 320 321 store.load(new ProcedureStore.ProcedureLoader() { 322 @Override 323 public void setMaxProcId(long maxProcId) { 324 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; 325 lastProcId.set(maxProcId); 326 } 327 328 @Override 329 public void load(ProcedureIterator procIter) throws IOException { 330 loadProcedures(procIter, abortOnCorruption); 331 } 332 333 @Override 334 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 335 int corruptedCount = 0; 336 while (procIter.hasNext()) { 337 Procedure<?> proc = procIter.next(); 338 LOG.error("Corrupt " + proc); 339 corruptedCount++; 340 } 341 if (abortOnCorruption && corruptedCount > 0) { 342 throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); 343 } 344 } 345 }); 346 } 347 348 private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { 349 proc.restoreLock(getEnvironment()); 350 restored.add(proc.getProcId()); 351 } 352 353 private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { 354 while (!stack.isEmpty()) { 355 restoreLock(stack.pop(), restored); 356 } 357 } 358 359 // Restore the locks for all the procedures. 360 // Notice that we need to restore the locks starting from the root proc, otherwise there will be 361 // problem that a sub procedure may hold the exclusive lock first and then we are stuck when 362 // calling the acquireLock method for the parent procedure. 363 // The algorithm is straight-forward: 364 // 1. Use a set to record the procedures which locks have already been restored. 365 // 2. Use a stack to store the hierarchy of the procedures 366 // 3. For all the procedure, we will first try to find its parent and push it into the stack, 367 // unless 368 // a. We have no parent, i.e, we are the root procedure 369 // b. The lock has already been restored(by checking the set introduced in #1) 370 // then we start to pop the stack and call acquireLock for each procedure. 371 // Notice that this should be done for all procedures, not only the ones in runnableList. 372 private void restoreLocks() { 373 Set<Long> restored = new HashSet<>(); 374 Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); 375 procedures.values().forEach(proc -> { 376 for (;;) { 377 if (restored.contains(proc.getProcId())) { 378 restoreLocks(stack, restored); 379 return; 380 } 381 if (!proc.hasParent()) { 382 restoreLock(proc, restored); 383 restoreLocks(stack, restored); 384 return; 385 } 386 stack.push(proc); 387 proc = procedures.get(proc.getParentProcId()); 388 } 389 }); 390 } 391 392 private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) 393 throws IOException { 394 // 1. Build the rollback stack 395 int runnableCount = 0; 396 int failedCount = 0; 397 int waitingCount = 0; 398 int waitingTimeoutCount = 0; 399 while (procIter.hasNext()) { 400 boolean finished = procIter.isNextFinished(); 401 @SuppressWarnings("unchecked") 402 Procedure<TEnvironment> proc = procIter.next(); 403 NonceKey nonceKey = proc.getNonceKey(); 404 long procId = proc.getProcId(); 405 406 if (finished) { 407 completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); 408 LOG.debug("Completed {}", proc); 409 } else { 410 if (!proc.hasParent()) { 411 assert !proc.isFinished() : "unexpected finished procedure"; 412 rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); 413 } 414 415 // add the procedure to the map 416 proc.beforeReplay(getEnvironment()); 417 procedures.put(proc.getProcId(), proc); 418 switch (proc.getState()) { 419 case RUNNABLE: 420 runnableCount++; 421 break; 422 case FAILED: 423 failedCount++; 424 break; 425 case WAITING: 426 waitingCount++; 427 break; 428 case WAITING_TIMEOUT: 429 waitingTimeoutCount++; 430 break; 431 default: 432 break; 433 } 434 } 435 436 if (nonceKey != null) { 437 nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map 438 } 439 } 440 441 // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will 442 // push it into the ProcedureScheduler directly to execute the rollback. But this does not work 443 // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove 444 // the queue from runQueue in scheduler, and then when a procedure which has lock access, for 445 // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler, 446 // we will add the queue back to let the workers poll from it. The assumption here is that, the 447 // procedure which has the xlock should have been polled out already, so when loading we can not 448 // add the procedure to scheduler first and then call acquireLock, since the procedure is still 449 // in the queue, and since we will remove the queue from runQueue, then no one can poll it out, 450 // then there is a dead lock 451 List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); 452 List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); 453 List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); 454 List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); 455 procIter.reset(); 456 while (procIter.hasNext()) { 457 if (procIter.isNextFinished()) { 458 procIter.skipNext(); 459 continue; 460 } 461 462 @SuppressWarnings("unchecked") 463 Procedure<TEnvironment> proc = procIter.next(); 464 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; 465 LOG.debug("Loading {}", proc); 466 Long rootProcId = getRootProcedureId(proc); 467 // The orphan procedures will be passed to handleCorrupted, so add an assert here 468 assert rootProcId != null; 469 470 if (proc.hasParent()) { 471 Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); 472 if (parent != null && !proc.isFinished()) { 473 parent.incChildrenLatch(); 474 } 475 } 476 477 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 478 procStack.loadStack(proc); 479 480 proc.setRootProcId(rootProcId); 481 switch (proc.getState()) { 482 case RUNNABLE: 483 runnableList.add(proc); 484 break; 485 case WAITING: 486 waitingList.add(proc); 487 break; 488 case WAITING_TIMEOUT: 489 waitingTimeoutList.add(proc); 490 break; 491 case FAILED: 492 failedList.add(proc); 493 break; 494 case ROLLEDBACK: 495 case INITIALIZING: 496 String msg = "Unexpected " + proc.getState() + " state for " + proc; 497 LOG.error(msg); 498 throw new UnsupportedOperationException(msg); 499 default: 500 break; 501 } 502 } 503 504 // 3. Check the waiting procedures to see if some of them can be added to runnable. 505 waitingList.forEach(proc -> { 506 if (!proc.hasChildren()) { 507 // Normally, WAITING procedures should be waken by its children. But, there is a case that, 508 // all the children are successful and before they can wake up their parent procedure, the 509 // master was killed. So, during recovering the procedures from ProcedureWal, its children 510 // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING 511 // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a 512 // exception will throw: 513 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 514 // "NOT RUNNABLE! " + procedure.toString()); 515 proc.setState(ProcedureState.RUNNABLE); 516 runnableList.add(proc); 517 } else { 518 proc.afterReplay(getEnvironment()); 519 } 520 }); 521 // 4. restore locks 522 restoreLocks(); 523 524 // 5. Push the procedures to the timeout executor 525 waitingTimeoutList.forEach(proc -> { 526 proc.afterReplay(getEnvironment()); 527 timeoutExecutor.add(proc); 528 }); 529 530 // 6. Push the procedure to the scheduler 531 failedList.forEach(scheduler::addBack); 532 runnableList.forEach(p -> { 533 p.afterReplay(getEnvironment()); 534 if (!p.hasParent()) { 535 sendProcedureLoadedNotification(p.getProcId()); 536 } 537 scheduler.addBack(p); 538 }); 539 // After all procedures put into the queue, signal the worker threads. 540 // Otherwise, there is a race condition. See HBASE-21364. 541 scheduler.signalAll(); 542 } 543 544 /** 545 * Initialize the procedure executor, but do not start workers. We will start them later. 546 * <p/> 547 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 548 * ensure a single executor, and start the procedure replay to resume and recover the previous 549 * pending and in-progress procedures. 550 * @param numThreads number of threads available for procedure execution. 551 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 552 * is found on replay. otherwise false. 553 */ 554 public void init(int numThreads, boolean abortOnCorruption) throws IOException { 555 // We have numThreads executor + one timer thread used for timing out 556 // procedures and triggering periodic procedures. 557 this.corePoolSize = numThreads; 558 this.maxPoolSize = 10 * numThreads; 559 LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", 560 corePoolSize, maxPoolSize); 561 562 this.threadGroup = new ThreadGroup("PEWorkerGroup"); 563 this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup); 564 565 // Create the workers 566 workerId.set(0); 567 workerThreads = new CopyOnWriteArrayList<>(); 568 for (int i = 0; i < corePoolSize; ++i) { 569 workerThreads.add(new WorkerThread(threadGroup)); 570 } 571 572 long st, et; 573 574 // Acquire the store lease. 575 st = System.nanoTime(); 576 store.recoverLease(); 577 et = System.nanoTime(); 578 LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), 579 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 580 581 // start the procedure scheduler 582 scheduler.start(); 583 584 // TODO: Split in two steps. 585 // TODO: Handle corrupted procedures (currently just a warn) 586 // The first one will make sure that we have the latest id, 587 // so we can start the threads and accept new procedures. 588 // The second step will do the actual load of old procedures. 589 st = System.nanoTime(); 590 load(abortOnCorruption); 591 et = System.nanoTime(); 592 LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), 593 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 594 } 595 596 /** 597 * Start the workers. 598 */ 599 public void startWorkers() throws IOException { 600 if (!running.compareAndSet(false, true)) { 601 LOG.warn("Already running"); 602 return; 603 } 604 // Start the executors. Here we must have the lastProcId set. 605 LOG.trace("Start workers {}", workerThreads.size()); 606 timeoutExecutor.start(); 607 for (WorkerThread worker: workerThreads) { 608 worker.start(); 609 } 610 611 // Internal chores 612 timeoutExecutor.add(new WorkerMonitor()); 613 614 // Add completed cleaner chore 615 addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, 616 nonceKeysToProcIdsMap)); 617 } 618 619 public void stop() { 620 if (!running.getAndSet(false)) { 621 return; 622 } 623 624 LOG.info("Stopping"); 625 scheduler.stop(); 626 timeoutExecutor.sendStopSignal(); 627 } 628 629 @VisibleForTesting 630 public void join() { 631 assert !isRunning() : "expected not running"; 632 633 // stop the timeout executor 634 timeoutExecutor.awaitTermination(); 635 636 // stop the worker threads 637 for (WorkerThread worker: workerThreads) { 638 worker.awaitTermination(); 639 } 640 641 // Destroy the Thread Group for the executors 642 // TODO: Fix. #join is not place to destroy resources. 643 try { 644 threadGroup.destroy(); 645 } catch (IllegalThreadStateException e) { 646 LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", 647 this.threadGroup, e.getMessage()); 648 // This dumps list of threads on STDOUT. 649 this.threadGroup.list(); 650 } 651 652 // reset the in-memory state for testing 653 completed.clear(); 654 rollbackStack.clear(); 655 procedures.clear(); 656 nonceKeysToProcIdsMap.clear(); 657 scheduler.clear(); 658 lastProcId.set(-1); 659 } 660 661 public void refreshConfiguration(final Configuration conf) { 662 this.conf = conf; 663 setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, 664 DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS); 665 } 666 667 // ========================================================================== 668 // Accessors 669 // ========================================================================== 670 public boolean isRunning() { 671 return running.get(); 672 } 673 674 /** 675 * @return the current number of worker threads. 676 */ 677 public int getWorkerThreadCount() { 678 return workerThreads.size(); 679 } 680 681 /** 682 * @return the core pool size settings. 683 */ 684 public int getCorePoolSize() { 685 return corePoolSize; 686 } 687 688 public int getActiveExecutorCount() { 689 return activeExecutorCount.get(); 690 } 691 692 public TEnvironment getEnvironment() { 693 return this.environment; 694 } 695 696 public ProcedureStore getStore() { 697 return this.store; 698 } 699 700 ProcedureScheduler getScheduler() { 701 return scheduler; 702 } 703 704 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { 705 this.keepAliveTime = timeUnit.toMillis(keepAliveTime); 706 this.scheduler.signalAll(); 707 } 708 709 public long getKeepAliveTime(final TimeUnit timeUnit) { 710 return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); 711 } 712 713 // ========================================================================== 714 // Submit/Remove Chores 715 // ========================================================================== 716 717 /** 718 * Add a chore procedure to the executor 719 * @param chore the chore to add 720 */ 721 public void addChore(ProcedureInMemoryChore<TEnvironment> chore) { 722 chore.setState(ProcedureState.WAITING_TIMEOUT); 723 timeoutExecutor.add(chore); 724 } 725 726 /** 727 * Remove a chore procedure from the executor 728 * @param chore the chore to remove 729 * @return whether the chore is removed, or it will be removed later 730 */ 731 public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) { 732 chore.setState(ProcedureState.SUCCESS); 733 return timeoutExecutor.remove(chore); 734 } 735 736 // ========================================================================== 737 // Nonce Procedure helpers 738 // ========================================================================== 739 /** 740 * Create a NonceKey from the specified nonceGroup and nonce. 741 * @param nonceGroup the group to use for the {@link NonceKey} 742 * @param nonce the nonce to use in the {@link NonceKey} 743 * @return the generated NonceKey 744 */ 745 public NonceKey createNonceKey(final long nonceGroup, final long nonce) { 746 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); 747 } 748 749 /** 750 * Register a nonce for a procedure that is going to be submitted. 751 * A procId will be reserved and on submitProcedure(), 752 * the procedure with the specified nonce will take the reserved ProcId. 753 * If someone already reserved the nonce, this method will return the procId reserved, 754 * otherwise an invalid procId will be returned. and the caller should procede 755 * and submit the procedure. 756 * 757 * @param nonceKey A unique identifier for this operation from the client or process. 758 * @return the procId associated with the nonce, if any otherwise an invalid procId. 759 */ 760 public long registerNonce(final NonceKey nonceKey) { 761 if (nonceKey == null) { 762 return -1; 763 } 764 765 // check if we have already a Reserved ID for the nonce 766 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); 767 if (oldProcId == null) { 768 // reserve a new Procedure ID, this will be associated with the nonce 769 // and the procedure submitted with the specified nonce will use this ID. 770 final long newProcId = nextProcId(); 771 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); 772 if (oldProcId == null) { 773 return -1; 774 } 775 } 776 777 // we found a registered nonce, but the procedure may not have been submitted yet. 778 // since the client expect the procedure to be submitted, spin here until it is. 779 final boolean traceEnabled = LOG.isTraceEnabled(); 780 while (isRunning() && 781 !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && 782 nonceKeysToProcIdsMap.containsKey(nonceKey)) { 783 if (traceEnabled) { 784 LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted"); 785 } 786 Threads.sleep(100); 787 } 788 return oldProcId.longValue(); 789 } 790 791 /** 792 * Remove the NonceKey if the procedure was not submitted to the executor. 793 * @param nonceKey A unique identifier for this operation from the client or process. 794 */ 795 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { 796 if (nonceKey == null) { 797 return; 798 } 799 800 final Long procId = nonceKeysToProcIdsMap.get(nonceKey); 801 if (procId == null) { 802 return; 803 } 804 805 // if the procedure was not submitted, remove the nonce 806 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { 807 nonceKeysToProcIdsMap.remove(nonceKey); 808 } 809 } 810 811 /** 812 * If the failure failed before submitting it, we may want to give back the 813 * same error to the requests with the same nonceKey. 814 * 815 * @param nonceKey A unique identifier for this operation from the client or process 816 * @param procName name of the procedure, used to inform the user 817 * @param procOwner name of the owner of the procedure, used to inform the user 818 * @param exception the failure to report to the user 819 */ 820 public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, 821 IOException exception) { 822 if (nonceKey == null) { 823 return; 824 } 825 826 Long procId = nonceKeysToProcIdsMap.get(nonceKey); 827 if (procId == null || completed.containsKey(procId)) { 828 return; 829 } 830 831 completed.computeIfAbsent(procId, (key) -> { 832 Procedure<TEnvironment> proc = new FailedProcedure<>(procId.longValue(), 833 procName, procOwner, nonceKey, exception); 834 835 return new CompletedProcedureRetainer<>(proc); 836 }); 837 } 838 839 // ========================================================================== 840 // Submit/Abort Procedure 841 // ========================================================================== 842 /** 843 * Add a new root-procedure to the executor. 844 * @param proc the new procedure to execute. 845 * @return the procedure id, that can be used to monitor the operation 846 */ 847 public long submitProcedure(Procedure<TEnvironment> proc) { 848 return submitProcedure(proc, null); 849 } 850 851 /** 852 * Bypass a procedure. If the procedure is set to bypass, all the logic in 853 * execute/rollback will be ignored and it will return success, whatever. 854 * It is used to recover buggy stuck procedures, releasing the lock resources 855 * and letting other procedures run. Bypassing one procedure (and its ancestors will 856 * be bypassed automatically) may leave the cluster in a middle state, e.g. region 857 * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, 858 * the operators may have to do some clean up on hdfs or schedule some assign procedures 859 * to let region online. DO AT YOUR OWN RISK. 860 * <p> 861 * A procedure can be bypassed only if 862 * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT 863 * or it is a root procedure without any child. 864 * 2. No other worker thread is executing it 865 * 3. No child procedure has been submitted 866 * 867 * <p> 868 * If all the requirements are meet, the procedure and its ancestors will be 869 * bypassed and persisted to WAL. 870 * 871 * <p> 872 * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. 873 * TODO: What about WAITING_TIMEOUT? 874 * @param pids the procedure id 875 * @param lockWait time to wait lock 876 * @param force if force set to true, we will bypass the procedure even if it is executing. 877 * This is for procedures which can't break out during executing(due to bug, mostly) 878 * In this case, bypassing the procedure is not enough, since it is already stuck 879 * there. We need to restart the master after bypassing, and letting the problematic 880 * procedure to execute wth bypass=true, so in that condition, the procedure can be 881 * successfully bypassed. 882 * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! 883 * @return true if bypass success 884 * @throws IOException IOException 885 */ 886 public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, 887 boolean recursive) 888 throws IOException { 889 List<Boolean> result = new ArrayList<Boolean>(pids.size()); 890 for(long pid: pids) { 891 result.add(bypassProcedure(pid, lockWait, force, recursive)); 892 } 893 return result; 894 } 895 896 boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) 897 throws IOException { 898 Preconditions.checkArgument(lockWait > 0, "lockWait should be positive"); 899 final Procedure<TEnvironment> procedure = getProcedure(pid); 900 if (procedure == null) { 901 LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); 902 return false; 903 } 904 905 LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", 906 procedure, lockWait, override, recursive); 907 IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); 908 if (lockEntry == null && !override) { 909 LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", 910 lockWait, procedure, override); 911 return false; 912 } else if (lockEntry == null) { 913 LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", 914 lockWait, procedure, override); 915 } 916 try { 917 // check whether the procedure is already finished 918 if (procedure.isFinished()) { 919 LOG.debug("{} is already finished, skipping bypass", procedure); 920 return false; 921 } 922 923 if (procedure.hasChildren()) { 924 if (recursive) { 925 // EXPENSIVE. Checks each live procedure of which there could be many!!! 926 // Is there another way to get children of a procedure? 927 LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); 928 this.procedures.forEachValue(1 /*Single-threaded*/, 929 // Transformer 930 v -> v.getParentProcId() == procedure.getProcId()? v: null, 931 // Consumer 932 v -> { 933 try { 934 bypassProcedure(v.getProcId(), lockWait, override, recursive); 935 } catch (IOException e) { 936 LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); 937 } 938 }); 939 } else { 940 LOG.debug("{} has children, skipping bypass", procedure); 941 return false; 942 } 943 } 944 945 // If the procedure has no parent or no child, we are safe to bypass it in whatever state 946 if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE 947 && procedure.getState() != ProcedureState.WAITING 948 && procedure.getState() != ProcedureState.WAITING_TIMEOUT) { 949 LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " 950 + "(with no parent), {}", 951 procedure); 952 // Question: how is the bypass done here? 953 return false; 954 } 955 956 // Now, the procedure is not finished, and no one can execute it since we take the lock now 957 // And we can be sure that its ancestor is not running too, since their child has not 958 // finished yet 959 Procedure<TEnvironment> current = procedure; 960 while (current != null) { 961 LOG.debug("Bypassing {}", current); 962 current.bypass(getEnvironment()); 963 store.update(procedure); 964 long parentID = current.getParentProcId(); 965 current = getProcedure(parentID); 966 } 967 968 //wake up waiting procedure, already checked there is no child 969 if (procedure.getState() == ProcedureState.WAITING) { 970 procedure.setState(ProcedureState.RUNNABLE); 971 store.update(procedure); 972 } 973 974 // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. 975 // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE 976 if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 977 LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); 978 if (timeoutExecutor.remove(procedure)) { 979 LOG.debug("removed procedure {} from timeoutExecutor", procedure); 980 timeoutExecutor.executeTimedoutProcedure(procedure); 981 } 982 } else if (lockEntry != null) { 983 scheduler.addFront(procedure); 984 LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); 985 } else { 986 // If we don't have the lock, we can't re-submit the queue, 987 // since it is already executing. To get rid of the stuck situation, we 988 // need to restart the master. With the procedure set to bypass, the procedureExecutor 989 // will bypass it and won't get stuck again. 990 LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " 991 + "skipping add to queue", 992 procedure); 993 } 994 return true; 995 996 } finally { 997 if (lockEntry != null) { 998 procExecutionLock.releaseLockEntry(lockEntry); 999 } 1000 } 1001 } 1002 1003 /** 1004 * Add a new root-procedure to the executor. 1005 * @param proc the new procedure to execute. 1006 * @param nonceKey the registered unique identifier for this operation from the client or process. 1007 * @return the procedure id, that can be used to monitor the operation 1008 */ 1009 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", 1010 justification = "FindBugs is blind to the check-for-null") 1011 public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { 1012 Preconditions.checkArgument(lastProcId.get() >= 0); 1013 1014 prepareProcedure(proc); 1015 1016 final Long currentProcId; 1017 if (nonceKey != null) { 1018 currentProcId = nonceKeysToProcIdsMap.get(nonceKey); 1019 Preconditions.checkArgument(currentProcId != null, 1020 "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); 1021 } else { 1022 currentProcId = nextProcId(); 1023 } 1024 1025 // Initialize the procedure 1026 proc.setNonceKey(nonceKey); 1027 proc.setProcId(currentProcId.longValue()); 1028 1029 // Commit the transaction 1030 store.insert(proc, null); 1031 LOG.debug("Stored {}", proc); 1032 1033 // Add the procedure to the executor 1034 return pushProcedure(proc); 1035 } 1036 1037 /** 1038 * Add a set of new root-procedure to the executor. 1039 * @param procs the new procedures to execute. 1040 */ 1041 // TODO: Do we need to take nonces here? 1042 public void submitProcedures(Procedure<TEnvironment>[] procs) { 1043 Preconditions.checkArgument(lastProcId.get() >= 0); 1044 if (procs == null || procs.length <= 0) { 1045 return; 1046 } 1047 1048 // Prepare procedure 1049 for (int i = 0; i < procs.length; ++i) { 1050 prepareProcedure(procs[i]).setProcId(nextProcId()); 1051 } 1052 1053 // Commit the transaction 1054 store.insert(procs); 1055 if (LOG.isDebugEnabled()) { 1056 LOG.debug("Stored " + Arrays.toString(procs)); 1057 } 1058 1059 // Add the procedure to the executor 1060 for (int i = 0; i < procs.length; ++i) { 1061 pushProcedure(procs[i]); 1062 } 1063 } 1064 1065 private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { 1066 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); 1067 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); 1068 if (this.checkOwnerSet) { 1069 Preconditions.checkArgument(proc.hasOwner(), "missing owner"); 1070 } 1071 return proc; 1072 } 1073 1074 private long pushProcedure(Procedure<TEnvironment> proc) { 1075 final long currentProcId = proc.getProcId(); 1076 1077 // Update metrics on start of a procedure 1078 proc.updateMetricsOnSubmit(getEnvironment()); 1079 1080 // Create the rollback stack for the procedure 1081 RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); 1082 rollbackStack.put(currentProcId, stack); 1083 1084 // Submit the new subprocedures 1085 assert !procedures.containsKey(currentProcId); 1086 procedures.put(currentProcId, proc); 1087 sendProcedureAddedNotification(currentProcId); 1088 scheduler.addBack(proc); 1089 return proc.getProcId(); 1090 } 1091 1092 /** 1093 * Send an abort notification the specified procedure. 1094 * Depending on the procedure implementation the abort can be considered or ignored. 1095 * @param procId the procedure to abort 1096 * @return true if the procedure exists and has received the abort, otherwise false. 1097 */ 1098 public boolean abort(long procId) { 1099 return abort(procId, true); 1100 } 1101 1102 /** 1103 * Send an abort notification to the specified procedure. 1104 * Depending on the procedure implementation, the abort can be considered or ignored. 1105 * @param procId the procedure to abort 1106 * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? 1107 * @return true if the procedure exists and has received the abort, otherwise false. 1108 */ 1109 public boolean abort(long procId, boolean mayInterruptIfRunning) { 1110 Procedure<TEnvironment> proc = procedures.get(procId); 1111 if (proc != null) { 1112 if (!mayInterruptIfRunning && proc.wasExecuted()) { 1113 return false; 1114 } 1115 return proc.abort(getEnvironment()); 1116 } 1117 return false; 1118 } 1119 1120 // ========================================================================== 1121 // Executor query helpers 1122 // ========================================================================== 1123 public Procedure<TEnvironment> getProcedure(final long procId) { 1124 return procedures.get(procId); 1125 } 1126 1127 public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { 1128 Procedure<TEnvironment> proc = getProcedure(procId); 1129 if (clazz.isInstance(proc)) { 1130 return clazz.cast(proc); 1131 } 1132 return null; 1133 } 1134 1135 public Procedure<TEnvironment> getResult(long procId) { 1136 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1137 if (retainer == null) { 1138 return null; 1139 } else { 1140 return retainer.getProcedure(); 1141 } 1142 } 1143 1144 /** 1145 * Return true if the procedure is finished. 1146 * The state may be "completed successfully" or "failed and rolledback". 1147 * Use getResult() to check the state or get the result data. 1148 * @param procId the ID of the procedure to check 1149 * @return true if the procedure execution is finished, otherwise false. 1150 */ 1151 public boolean isFinished(final long procId) { 1152 return !procedures.containsKey(procId); 1153 } 1154 1155 /** 1156 * Return true if the procedure is started. 1157 * @param procId the ID of the procedure to check 1158 * @return true if the procedure execution is started, otherwise false. 1159 */ 1160 public boolean isStarted(long procId) { 1161 Procedure<?> proc = procedures.get(procId); 1162 if (proc == null) { 1163 return completed.get(procId) != null; 1164 } 1165 return proc.wasExecuted(); 1166 } 1167 1168 /** 1169 * Mark the specified completed procedure, as ready to remove. 1170 * @param procId the ID of the procedure to remove 1171 */ 1172 public void removeResult(long procId) { 1173 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1174 if (retainer == null) { 1175 assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; 1176 LOG.debug("pid={} already removed by the cleaner.", procId); 1177 return; 1178 } 1179 1180 // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. 1181 retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); 1182 } 1183 1184 public Procedure<TEnvironment> getResultOrProcedure(long procId) { 1185 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1186 if (retainer == null) { 1187 return procedures.get(procId); 1188 } else { 1189 return retainer.getProcedure(); 1190 } 1191 } 1192 1193 /** 1194 * Check if the user is this procedure's owner 1195 * @param procId the target procedure 1196 * @param user the user 1197 * @return true if the user is the owner of the procedure, 1198 * false otherwise or the owner is unknown. 1199 */ 1200 public boolean isProcedureOwner(long procId, User user) { 1201 if (user == null) { 1202 return false; 1203 } 1204 final Procedure<TEnvironment> runningProc = procedures.get(procId); 1205 if (runningProc != null) { 1206 return runningProc.getOwner().equals(user.getShortName()); 1207 } 1208 1209 final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1210 if (retainer != null) { 1211 return retainer.getProcedure().getOwner().equals(user.getShortName()); 1212 } 1213 1214 // Procedure either does not exist or has already completed and got cleaned up. 1215 // At this time, we cannot check the owner of the procedure 1216 return false; 1217 } 1218 1219 /** 1220 * Should only be used when starting up, where the procedure workers have not been started. 1221 * <p/> 1222 * If the procedure works has been started, the return values maybe changed when you are 1223 * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as 1224 * it will do a copy, and also include the finished procedures. 1225 */ 1226 public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() { 1227 return procedures.values(); 1228 } 1229 1230 /** 1231 * Get procedures. 1232 * @return the procedures in a list 1233 */ 1234 public List<Procedure<TEnvironment>> getProcedures() { 1235 List<Procedure<TEnvironment>> procedureList = 1236 new ArrayList<>(procedures.size() + completed.size()); 1237 procedureList.addAll(procedures.values()); 1238 // Note: The procedure could show up twice in the list with different state, as 1239 // it could complete after we walk through procedures list and insert into 1240 // procedureList - it is ok, as we will use the information in the Procedure 1241 // to figure it out; to prevent this would increase the complexity of the logic. 1242 completed.values().stream().map(CompletedProcedureRetainer::getProcedure) 1243 .forEach(procedureList::add); 1244 return procedureList; 1245 } 1246 1247 // ========================================================================== 1248 // Listeners helpers 1249 // ========================================================================== 1250 public void registerListener(ProcedureExecutorListener listener) { 1251 this.listeners.add(listener); 1252 } 1253 1254 public boolean unregisterListener(ProcedureExecutorListener listener) { 1255 return this.listeners.remove(listener); 1256 } 1257 1258 private void sendProcedureLoadedNotification(final long procId) { 1259 if (!this.listeners.isEmpty()) { 1260 for (ProcedureExecutorListener listener: this.listeners) { 1261 try { 1262 listener.procedureLoaded(procId); 1263 } catch (Throwable e) { 1264 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1265 } 1266 } 1267 } 1268 } 1269 1270 private void sendProcedureAddedNotification(final long procId) { 1271 if (!this.listeners.isEmpty()) { 1272 for (ProcedureExecutorListener listener: this.listeners) { 1273 try { 1274 listener.procedureAdded(procId); 1275 } catch (Throwable e) { 1276 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1277 } 1278 } 1279 } 1280 } 1281 1282 private void sendProcedureFinishedNotification(final long procId) { 1283 if (!this.listeners.isEmpty()) { 1284 for (ProcedureExecutorListener listener: this.listeners) { 1285 try { 1286 listener.procedureFinished(procId); 1287 } catch (Throwable e) { 1288 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1289 } 1290 } 1291 } 1292 } 1293 1294 // ========================================================================== 1295 // Procedure IDs helpers 1296 // ========================================================================== 1297 private long nextProcId() { 1298 long procId = lastProcId.incrementAndGet(); 1299 if (procId < 0) { 1300 while (!lastProcId.compareAndSet(procId, 0)) { 1301 procId = lastProcId.get(); 1302 if (procId >= 0) { 1303 break; 1304 } 1305 } 1306 while (procedures.containsKey(procId)) { 1307 procId = lastProcId.incrementAndGet(); 1308 } 1309 } 1310 assert procId >= 0 : "Invalid procId " + procId; 1311 return procId; 1312 } 1313 1314 @VisibleForTesting 1315 protected long getLastProcId() { 1316 return lastProcId.get(); 1317 } 1318 1319 @VisibleForTesting 1320 public Set<Long> getActiveProcIds() { 1321 return procedures.keySet(); 1322 } 1323 1324 Long getRootProcedureId(Procedure<TEnvironment> proc) { 1325 return Procedure.getRootProcedureId(procedures, proc); 1326 } 1327 1328 // ========================================================================== 1329 // Executions 1330 // ========================================================================== 1331 private void executeProcedure(Procedure<TEnvironment> proc) { 1332 if (proc.isFinished()) { 1333 LOG.debug("{} is already finished, skipping execution", proc); 1334 return; 1335 } 1336 final Long rootProcId = getRootProcedureId(proc); 1337 if (rootProcId == null) { 1338 // The 'proc' was ready to run but the root procedure was rolledback 1339 LOG.warn("Rollback because parent is done/rolledback proc=" + proc); 1340 executeRollback(proc); 1341 return; 1342 } 1343 1344 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 1345 if (procStack == null) { 1346 LOG.warn("RootProcedureState is null for " + proc.getProcId()); 1347 return; 1348 } 1349 do { 1350 // Try to acquire the execution 1351 if (!procStack.acquire(proc)) { 1352 if (procStack.setRollback()) { 1353 // we have the 'rollback-lock' we can start rollingback 1354 switch (executeRollback(rootProcId, procStack)) { 1355 case LOCK_ACQUIRED: 1356 break; 1357 case LOCK_YIELD_WAIT: 1358 procStack.unsetRollback(); 1359 scheduler.yield(proc); 1360 break; 1361 case LOCK_EVENT_WAIT: 1362 LOG.info("LOCK_EVENT_WAIT rollback..." + proc); 1363 procStack.unsetRollback(); 1364 break; 1365 default: 1366 throw new UnsupportedOperationException(); 1367 } 1368 } else { 1369 // if we can't rollback means that some child is still running. 1370 // the rollback will be executed after all the children are done. 1371 // If the procedure was never executed, remove and mark it as rolledback. 1372 if (!proc.wasExecuted()) { 1373 switch (executeRollback(proc)) { 1374 case LOCK_ACQUIRED: 1375 break; 1376 case LOCK_YIELD_WAIT: 1377 scheduler.yield(proc); 1378 break; 1379 case LOCK_EVENT_WAIT: 1380 LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); 1381 break; 1382 default: 1383 throw new UnsupportedOperationException(); 1384 } 1385 } 1386 } 1387 break; 1388 } 1389 1390 // Execute the procedure 1391 assert proc.getState() == ProcedureState.RUNNABLE : proc; 1392 // Note that lock is NOT about concurrency but rather about ensuring 1393 // ownership of a procedure of an entity such as a region or table 1394 LockState lockState = acquireLock(proc); 1395 switch (lockState) { 1396 case LOCK_ACQUIRED: 1397 execProcedure(procStack, proc); 1398 break; 1399 case LOCK_YIELD_WAIT: 1400 LOG.info(lockState + " " + proc); 1401 scheduler.yield(proc); 1402 break; 1403 case LOCK_EVENT_WAIT: 1404 // Someone will wake us up when the lock is available 1405 LOG.debug(lockState + " " + proc); 1406 break; 1407 default: 1408 throw new UnsupportedOperationException(); 1409 } 1410 procStack.release(proc); 1411 1412 if (proc.isSuccess()) { 1413 // update metrics on finishing the procedure 1414 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); 1415 LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); 1416 // Finalize the procedure state 1417 if (proc.getProcId() == rootProcId) { 1418 procedureFinished(proc); 1419 } else { 1420 execCompletionCleanup(proc); 1421 } 1422 break; 1423 } 1424 } while (procStack.isFailed()); 1425 } 1426 1427 private LockState acquireLock(Procedure<TEnvironment> proc) { 1428 TEnvironment env = getEnvironment(); 1429 // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if 1430 // hasLock is true. 1431 if (proc.hasLock()) { 1432 return LockState.LOCK_ACQUIRED; 1433 } 1434 return proc.doAcquireLock(env, store); 1435 } 1436 1437 private void releaseLock(Procedure<TEnvironment> proc, boolean force) { 1438 TEnvironment env = getEnvironment(); 1439 // For how the framework works, we know that we will always have the lock 1440 // when we call releaseLock(), so we can avoid calling proc.hasLock() 1441 if (force || !proc.holdLock(env) || proc.isFinished()) { 1442 proc.doReleaseLock(env, store); 1443 } 1444 } 1445 1446 /** 1447 * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the 1448 * root-procedure will be visible as finished to user, and the result will be the fatal exception. 1449 */ 1450 private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { 1451 Procedure<TEnvironment> rootProc = procedures.get(rootProcId); 1452 RemoteProcedureException exception = rootProc.getException(); 1453 // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are 1454 // rolling back because the subprocedure does. Clarify. 1455 if (exception == null) { 1456 exception = procStack.getException(); 1457 rootProc.setFailure(exception); 1458 store.update(rootProc); 1459 } 1460 1461 List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); 1462 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; 1463 1464 int stackTail = subprocStack.size(); 1465 while (stackTail-- > 0) { 1466 Procedure<TEnvironment> proc = subprocStack.get(stackTail); 1467 IdLock.Entry lockEntry = null; 1468 // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need 1469 // this check, as the worker will hold the lock before executing a procedure. This is the only 1470 // place where we may hold two procedure execution locks, and there is a fence in the 1471 // RootProcedureState where we can make sure that only one worker can execute the rollback of 1472 // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to 1473 // prevent race between us and the force update thread. 1474 if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) { 1475 try { 1476 lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1477 } catch (IOException e) { 1478 // can only happen if interrupted, so not a big deal to propagate it 1479 throw new UncheckedIOException(e); 1480 } 1481 } 1482 try { 1483 // For the sub procedures which are successfully finished, we do not rollback them. 1484 // Typically, if we want to rollback a procedure, we first need to rollback it, and then 1485 // recursively rollback its ancestors. The state changes which are done by sub procedures 1486 // should be handled by parent procedures when rolling back. For example, when rolling back 1487 // a MergeTableProcedure, we will schedule new procedures to bring the offline regions 1488 // online, instead of rolling back the original procedures which offlined the regions(in 1489 // fact these procedures can not be rolled back...). 1490 if (proc.isSuccess()) { 1491 // Just do the cleanup work, without actually executing the rollback 1492 subprocStack.remove(stackTail); 1493 cleanupAfterRollbackOneStep(proc); 1494 continue; 1495 } 1496 LockState lockState = acquireLock(proc); 1497 if (lockState != LockState.LOCK_ACQUIRED) { 1498 // can't take a lock on the procedure, add the root-proc back on the 1499 // queue waiting for the lock availability 1500 return lockState; 1501 } 1502 1503 lockState = executeRollback(proc); 1504 releaseLock(proc, false); 1505 boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; 1506 abortRollback |= !isRunning() || !store.isRunning(); 1507 1508 // allows to kill the executor before something is stored to the wal. 1509 // useful to test the procedure recovery. 1510 if (abortRollback) { 1511 return lockState; 1512 } 1513 1514 subprocStack.remove(stackTail); 1515 1516 // if the procedure is kind enough to pass the slot to someone else, yield 1517 // if the proc is already finished, do not yield 1518 if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { 1519 return LockState.LOCK_YIELD_WAIT; 1520 } 1521 1522 if (proc != rootProc) { 1523 execCompletionCleanup(proc); 1524 } 1525 } finally { 1526 if (lockEntry != null) { 1527 procExecutionLock.releaseLockEntry(lockEntry); 1528 } 1529 } 1530 } 1531 1532 // Finalize the procedure state 1533 LOG.info("Rolled back {} exec-time={}", rootProc, 1534 StringUtils.humanTimeDiff(rootProc.elapsedTime())); 1535 procedureFinished(rootProc); 1536 return LockState.LOCK_ACQUIRED; 1537 } 1538 1539 private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { 1540 if (proc.removeStackIndex()) { 1541 if (!proc.isSuccess()) { 1542 proc.setState(ProcedureState.ROLLEDBACK); 1543 } 1544 1545 // update metrics on finishing the procedure (fail) 1546 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); 1547 1548 if (proc.hasParent()) { 1549 store.delete(proc.getProcId()); 1550 procedures.remove(proc.getProcId()); 1551 } else { 1552 final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); 1553 if (childProcIds != null) { 1554 store.delete(proc, childProcIds); 1555 } else { 1556 store.update(proc); 1557 } 1558 } 1559 } else { 1560 store.update(proc); 1561 } 1562 } 1563 1564 /** 1565 * Execute the rollback of the procedure step. 1566 * It updates the store with the new state (stack index) 1567 * or will remove completly the procedure in case it is a child. 1568 */ 1569 private LockState executeRollback(Procedure<TEnvironment> proc) { 1570 try { 1571 proc.doRollback(getEnvironment()); 1572 } catch (IOException e) { 1573 LOG.debug("Roll back attempt failed for {}", proc, e); 1574 return LockState.LOCK_YIELD_WAIT; 1575 } catch (InterruptedException e) { 1576 handleInterruptedException(proc, e); 1577 return LockState.LOCK_YIELD_WAIT; 1578 } catch (Throwable e) { 1579 // Catch NullPointerExceptions or similar errors... 1580 LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); 1581 } 1582 1583 // allows to kill the executor before something is stored to the wal. 1584 // useful to test the procedure recovery. 1585 if (testing != null && testing.shouldKillBeforeStoreUpdate()) { 1586 String msg = "TESTING: Kill before store update"; 1587 LOG.debug(msg); 1588 stop(); 1589 throw new RuntimeException(msg); 1590 } 1591 1592 cleanupAfterRollbackOneStep(proc); 1593 1594 return LockState.LOCK_ACQUIRED; 1595 } 1596 1597 private void yieldProcedure(Procedure<TEnvironment> proc) { 1598 releaseLock(proc, false); 1599 scheduler.yield(proc); 1600 } 1601 1602 /** 1603 * Executes <code>procedure</code> 1604 * <ul> 1605 * <li>Calls the doExecute() of the procedure 1606 * <li>If the procedure execution didn't fail (i.e. valid user input) 1607 * <ul> 1608 * <li>...and returned subprocedures 1609 * <ul><li>The subprocedures are initialized. 1610 * <li>The subprocedures are added to the store 1611 * <li>The subprocedures are added to the runnable queue 1612 * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete 1613 * </ul> 1614 * </li> 1615 * <li>...if there are no subprocedure 1616 * <ul><li>the procedure completed successfully 1617 * <li>if there is a parent (WAITING) 1618 * <li>the parent state will be set to RUNNABLE 1619 * </ul> 1620 * </li> 1621 * </ul> 1622 * </li> 1623 * <li>In case of failure 1624 * <ul> 1625 * <li>The store is updated with the new state</li> 1626 * <li>The executor (caller of this method) will start the rollback of the procedure</li> 1627 * </ul> 1628 * </li> 1629 * </ul> 1630 */ 1631 private void execProcedure(RootProcedureState<TEnvironment> procStack, 1632 Procedure<TEnvironment> procedure) { 1633 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 1634 "NOT RUNNABLE! " + procedure.toString()); 1635 1636 // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. 1637 // The exception is caught below and then we hurry to the exit without disturbing state. The 1638 // idea is that the processing of this procedure will be unsuspended later by an external event 1639 // such the report of a region open. 1640 boolean suspended = false; 1641 1642 // Whether to 're-' -execute; run through the loop again. 1643 boolean reExecute = false; 1644 1645 Procedure<TEnvironment>[] subprocs = null; 1646 do { 1647 reExecute = false; 1648 procedure.resetPersistence(); 1649 try { 1650 subprocs = procedure.doExecute(getEnvironment()); 1651 if (subprocs != null && subprocs.length == 0) { 1652 subprocs = null; 1653 } 1654 } catch (ProcedureSuspendedException e) { 1655 LOG.trace("Suspend {}", procedure); 1656 suspended = true; 1657 } catch (ProcedureYieldException e) { 1658 LOG.trace("Yield {}", procedure, e); 1659 yieldProcedure(procedure); 1660 return; 1661 } catch (InterruptedException e) { 1662 LOG.trace("Yield interrupt {}", procedure, e); 1663 handleInterruptedException(procedure, e); 1664 yieldProcedure(procedure); 1665 return; 1666 } catch (Throwable e) { 1667 // Catch NullPointerExceptions or similar errors... 1668 String msg = "CODE-BUG: Uncaught runtime exception: " + procedure; 1669 LOG.error(msg, e); 1670 procedure.setFailure(new RemoteProcedureException(msg, e)); 1671 } 1672 1673 if (!procedure.isFailed()) { 1674 if (subprocs != null) { 1675 if (subprocs.length == 1 && subprocs[0] == procedure) { 1676 // Procedure returned itself. Quick-shortcut for a state machine-like procedure; 1677 // i.e. we go around this loop again rather than go back out on the scheduler queue. 1678 subprocs = null; 1679 reExecute = true; 1680 LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); 1681 } else { 1682 // Yield the current procedure, and make the subprocedure runnable 1683 // subprocs may come back 'null'. 1684 subprocs = initializeChildren(procStack, procedure, subprocs); 1685 LOG.info("Initialized subprocedures=" + 1686 (subprocs == null? null: 1687 Stream.of(subprocs).map(e -> "{" + e.toString() + "}"). 1688 collect(Collectors.toList()).toString())); 1689 } 1690 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1691 LOG.trace("Added to timeoutExecutor {}", procedure); 1692 timeoutExecutor.add(procedure); 1693 } else if (!suspended) { 1694 // No subtask, so we are done 1695 procedure.setState(ProcedureState.SUCCESS); 1696 } 1697 } 1698 1699 // Add the procedure to the stack 1700 procStack.addRollbackStep(procedure); 1701 1702 // allows to kill the executor before something is stored to the wal. 1703 // useful to test the procedure recovery. 1704 if (testing != null && 1705 testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) { 1706 kill("TESTING: Kill BEFORE store update: " + procedure); 1707 } 1708 1709 // TODO: The code here doesn't check if store is running before persisting to the store as 1710 // it relies on the method call below to throw RuntimeException to wind up the stack and 1711 // executor thread to stop. The statement following the method call below seems to check if 1712 // store is not running, to prevent scheduling children procedures, re-execution or yield 1713 // of this procedure. This may need more scrutiny and subsequent cleanup in future 1714 // 1715 // Commit the transaction even if a suspend (state may have changed). Note this append 1716 // can take a bunch of time to complete. 1717 if (procedure.needPersistence()) { 1718 updateStoreOnExec(procStack, procedure, subprocs); 1719 } 1720 1721 // if the store is not running we are aborting 1722 if (!store.isRunning()) { 1723 return; 1724 } 1725 // if the procedure is kind enough to pass the slot to someone else, yield 1726 if (procedure.isRunnable() && !suspended && 1727 procedure.isYieldAfterExecutionStep(getEnvironment())) { 1728 yieldProcedure(procedure); 1729 return; 1730 } 1731 1732 assert (reExecute && subprocs == null) || !reExecute; 1733 } while (reExecute); 1734 1735 // Allows to kill the executor after something is stored to the WAL but before the below 1736 // state settings are done -- in particular the one on the end where we make parent 1737 // RUNNABLE again when its children are done; see countDownChildren. 1738 if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) { 1739 kill("TESTING: Kill AFTER store update: " + procedure); 1740 } 1741 1742 // Submit the new subprocedures 1743 if (subprocs != null && !procedure.isFailed()) { 1744 submitChildrenProcedures(subprocs); 1745 } 1746 1747 // we need to log the release lock operation before waking up the parent procedure, as there 1748 // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all 1749 // the sub procedures from store and cause problems... 1750 releaseLock(procedure, false); 1751 1752 // if the procedure is complete and has a parent, count down the children latch. 1753 // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. 1754 if (!suspended && procedure.isFinished() && procedure.hasParent()) { 1755 countDownChildren(procStack, procedure); 1756 } 1757 } 1758 1759 private void kill(String msg) { 1760 LOG.debug(msg); 1761 stop(); 1762 throw new RuntimeException(msg); 1763 } 1764 1765 private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, 1766 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1767 assert subprocs != null : "expected subprocedures"; 1768 final long rootProcId = getRootProcedureId(procedure); 1769 for (int i = 0; i < subprocs.length; ++i) { 1770 Procedure<TEnvironment> subproc = subprocs[i]; 1771 if (subproc == null) { 1772 String msg = "subproc[" + i + "] is null, aborting the procedure"; 1773 procedure.setFailure(new RemoteProcedureException(msg, 1774 new IllegalArgumentIOException(msg))); 1775 return null; 1776 } 1777 1778 assert subproc.getState() == ProcedureState.INITIALIZING : subproc; 1779 subproc.setParentProcId(procedure.getProcId()); 1780 subproc.setRootProcId(rootProcId); 1781 subproc.setProcId(nextProcId()); 1782 procStack.addSubProcedure(subproc); 1783 } 1784 1785 if (!procedure.isFailed()) { 1786 procedure.setChildrenLatch(subprocs.length); 1787 switch (procedure.getState()) { 1788 case RUNNABLE: 1789 procedure.setState(ProcedureState.WAITING); 1790 break; 1791 case WAITING_TIMEOUT: 1792 timeoutExecutor.add(procedure); 1793 break; 1794 default: 1795 break; 1796 } 1797 } 1798 return subprocs; 1799 } 1800 1801 private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { 1802 for (int i = 0; i < subprocs.length; ++i) { 1803 Procedure<TEnvironment> subproc = subprocs[i]; 1804 subproc.updateMetricsOnSubmit(getEnvironment()); 1805 assert !procedures.containsKey(subproc.getProcId()); 1806 procedures.put(subproc.getProcId(), subproc); 1807 scheduler.addFront(subproc); 1808 } 1809 } 1810 1811 private void countDownChildren(RootProcedureState<TEnvironment> procStack, 1812 Procedure<TEnvironment> procedure) { 1813 Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); 1814 if (parent == null) { 1815 assert procStack.isRollingback(); 1816 return; 1817 } 1818 1819 // If this procedure is the last child awake the parent procedure 1820 if (parent.tryRunnable()) { 1821 // If we succeeded in making the parent runnable -- i.e. all of its 1822 // children have completed, move parent to front of the queue. 1823 store.update(parent); 1824 scheduler.addFront(parent); 1825 LOG.info("Finished subprocedure(s) of " + parent + "; resume parent processing."); 1826 return; 1827 } 1828 } 1829 1830 private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, 1831 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1832 if (subprocs != null && !procedure.isFailed()) { 1833 if (LOG.isTraceEnabled()) { 1834 LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); 1835 } 1836 store.insert(procedure, subprocs); 1837 } else { 1838 LOG.trace("Store update {}", procedure); 1839 if (procedure.isFinished() && !procedure.hasParent()) { 1840 // remove child procedures 1841 final long[] childProcIds = procStack.getSubprocedureIds(); 1842 if (childProcIds != null) { 1843 store.delete(procedure, childProcIds); 1844 for (int i = 0; i < childProcIds.length; ++i) { 1845 procedures.remove(childProcIds[i]); 1846 } 1847 } else { 1848 store.update(procedure); 1849 } 1850 } else { 1851 store.update(procedure); 1852 } 1853 } 1854 } 1855 1856 private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { 1857 LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); 1858 // NOTE: We don't call Thread.currentThread().interrupt() 1859 // because otherwise all the subsequent calls e.g. Thread.sleep() will throw 1860 // the InterruptedException. If the master is going down, we will be notified 1861 // and the executor/store will be stopped. 1862 // (The interrupted procedure will be retried on the next run) 1863 } 1864 1865 private void execCompletionCleanup(Procedure<TEnvironment> proc) { 1866 final TEnvironment env = getEnvironment(); 1867 if (proc.hasLock()) { 1868 LOG.warn("Usually this should not happen, we will release the lock before if the procedure" + 1869 " is finished, even if the holdLock is true, arrive here means we have some holes where" + 1870 " we do not release the lock. And the releaseLock below may fail since the procedure may" + 1871 " have already been deleted from the procedure store."); 1872 releaseLock(proc, true); 1873 } 1874 try { 1875 proc.completionCleanup(env); 1876 } catch (Throwable e) { 1877 // Catch NullPointerExceptions or similar errors... 1878 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); 1879 } 1880 } 1881 1882 private void procedureFinished(Procedure<TEnvironment> proc) { 1883 // call the procedure completion cleanup handler 1884 execCompletionCleanup(proc); 1885 1886 CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); 1887 1888 // update the executor internal state maps 1889 if (!proc.shouldWaitClientAck(getEnvironment())) { 1890 retainer.setClientAckTime(0); 1891 } 1892 1893 completed.put(proc.getProcId(), retainer); 1894 rollbackStack.remove(proc.getProcId()); 1895 procedures.remove(proc.getProcId()); 1896 1897 // call the runnableSet completion cleanup handler 1898 try { 1899 scheduler.completionCleanup(proc); 1900 } catch (Throwable e) { 1901 // Catch NullPointerExceptions or similar errors... 1902 LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); 1903 } 1904 1905 // Notify the listeners 1906 sendProcedureFinishedNotification(proc.getProcId()); 1907 } 1908 1909 RootProcedureState<TEnvironment> getProcStack(long rootProcId) { 1910 return rollbackStack.get(rootProcId); 1911 } 1912 1913 @VisibleForTesting 1914 ProcedureScheduler getProcedureScheduler() { 1915 return scheduler; 1916 } 1917 1918 @VisibleForTesting 1919 int getCompletedSize() { 1920 return completed.size(); 1921 } 1922 1923 @VisibleForTesting 1924 public IdLock getProcExecutionLock() { 1925 return procExecutionLock; 1926 } 1927 1928 // ========================================================================== 1929 // Worker Thread 1930 // ========================================================================== 1931 private class WorkerThread extends StoppableThread { 1932 private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); 1933 private volatile Procedure<TEnvironment> activeProcedure; 1934 1935 public WorkerThread(ThreadGroup group) { 1936 this(group, "PEWorker-"); 1937 } 1938 1939 protected WorkerThread(ThreadGroup group, String prefix) { 1940 super(group, prefix + workerId.incrementAndGet()); 1941 setDaemon(true); 1942 } 1943 1944 @Override 1945 public void sendStopSignal() { 1946 scheduler.signalAll(); 1947 } 1948 @Override 1949 public void run() { 1950 long lastUpdate = EnvironmentEdgeManager.currentTime(); 1951 try { 1952 while (isRunning() && keepAlive(lastUpdate)) { 1953 @SuppressWarnings("unchecked") 1954 Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); 1955 if (proc == null) { 1956 continue; 1957 } 1958 this.activeProcedure = proc; 1959 int activeCount = activeExecutorCount.incrementAndGet(); 1960 int runningCount = store.setRunningProcedureCount(activeCount); 1961 LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), 1962 runningCount, activeCount); 1963 executionStartTime.set(EnvironmentEdgeManager.currentTime()); 1964 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1965 try { 1966 executeProcedure(proc); 1967 } catch (AssertionError e) { 1968 LOG.info("ASSERT pid=" + proc.getProcId(), e); 1969 throw e; 1970 } finally { 1971 procExecutionLock.releaseLockEntry(lockEntry); 1972 activeCount = activeExecutorCount.decrementAndGet(); 1973 runningCount = store.setRunningProcedureCount(activeCount); 1974 LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), 1975 runningCount, activeCount); 1976 this.activeProcedure = null; 1977 lastUpdate = EnvironmentEdgeManager.currentTime(); 1978 executionStartTime.set(Long.MAX_VALUE); 1979 } 1980 } 1981 } catch (Throwable t) { 1982 LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); 1983 } finally { 1984 LOG.trace("Worker terminated."); 1985 } 1986 workerThreads.remove(this); 1987 } 1988 1989 @Override 1990 public String toString() { 1991 Procedure<?> p = this.activeProcedure; 1992 return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); 1993 } 1994 1995 /** 1996 * @return the time since the current procedure is running 1997 */ 1998 public long getCurrentRunTime() { 1999 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); 2000 } 2001 2002 // core worker never timeout 2003 protected boolean keepAlive(long lastUpdate) { 2004 return true; 2005 } 2006 } 2007 2008 // A worker thread which can be added when core workers are stuck. Will timeout after 2009 // keepAliveTime if there is no procedure to run. 2010 private final class KeepAliveWorkerThread extends WorkerThread { 2011 public KeepAliveWorkerThread(ThreadGroup group) { 2012 super(group, "KeepAlivePEWorker-"); 2013 } 2014 2015 @Override 2016 protected boolean keepAlive(long lastUpdate) { 2017 return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; 2018 } 2019 } 2020 2021 // ---------------------------------------------------------------------------- 2022 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the 2023 // full set of procedures pending and completed to write a compacted 2024 // version of the log (in case is a log)? 2025 // In theory no, procedures are have a short life, so at some point the store 2026 // will have the tracker saying everything is in the last log. 2027 // ---------------------------------------------------------------------------- 2028 2029 private final class WorkerMonitor extends InlineChore { 2030 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = 2031 "hbase.procedure.worker.monitor.interval.msec"; 2032 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec 2033 2034 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = 2035 "hbase.procedure.worker.stuck.threshold.msec"; 2036 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec 2037 2038 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = 2039 "hbase.procedure.worker.add.stuck.percentage"; 2040 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck 2041 2042 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; 2043 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; 2044 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; 2045 2046 public WorkerMonitor() { 2047 refreshConfig(); 2048 } 2049 2050 @Override 2051 public void run() { 2052 final int stuckCount = checkForStuckWorkers(); 2053 checkThreadCount(stuckCount); 2054 2055 // refresh interval (poor man dynamic conf update) 2056 refreshConfig(); 2057 } 2058 2059 private int checkForStuckWorkers() { 2060 // check if any of the worker is stuck 2061 int stuckCount = 0; 2062 for (WorkerThread worker : workerThreads) { 2063 if (worker.getCurrentRunTime() < stuckThreshold) { 2064 continue; 2065 } 2066 2067 // WARN the worker is stuck 2068 stuckCount++; 2069 LOG.warn("Worker stuck {}, run time {}", worker, 2070 StringUtils.humanTimeDiff(worker.getCurrentRunTime())); 2071 } 2072 return stuckCount; 2073 } 2074 2075 private void checkThreadCount(final int stuckCount) { 2076 // nothing to do if there are no runnable tasks 2077 if (stuckCount < 1 || !scheduler.hasRunnables()) { 2078 return; 2079 } 2080 2081 // add a new thread if the worker stuck percentage exceed the threshold limit 2082 // and every handler is active. 2083 final float stuckPerc = ((float) stuckCount) / workerThreads.size(); 2084 // let's add new worker thread more aggressively, as they will timeout finally if there is no 2085 // work to do. 2086 if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { 2087 final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); 2088 workerThreads.add(worker); 2089 worker.start(); 2090 LOG.debug("Added new worker thread {}", worker); 2091 } 2092 } 2093 2094 private void refreshConfig() { 2095 addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, 2096 DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); 2097 timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, 2098 DEFAULT_WORKER_MONITOR_INTERVAL); 2099 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, 2100 DEFAULT_WORKER_STUCK_THRESHOLD); 2101 } 2102 2103 @Override 2104 public int getTimeoutInterval() { 2105 return timeoutInterval; 2106 } 2107 } 2108}