001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import edu.umd.cs.findbugs.annotations.Nullable; 021import java.io.IOException; 022import java.io.UncheckedIOException; 023import java.util.ArrayDeque; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Deque; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.CopyOnWriteArrayList; 033import java.util.concurrent.Executor; 034import java.util.concurrent.Executors; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.concurrent.atomic.AtomicLong; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 044import org.apache.hadoop.hbase.log.HBaseMarkers; 045import org.apache.hadoop.hbase.procedure2.Procedure.LockState; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 048import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 049import org.apache.hadoop.hbase.procedure2.util.StringUtils; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 052import org.apache.hadoop.hbase.util.IdLock; 053import org.apache.hadoop.hbase.util.NonceKey; 054import org.apache.hadoop.hbase.util.Threads; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 060import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 062 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 064 065/** 066 * Thread Pool that executes the submitted procedures. 067 * The executor has a ProcedureStore associated. 068 * Each operation is logged and on restart the pending procedures are resumed. 069 * 070 * Unless the Procedure code throws an error (e.g. invalid user input) 071 * the procedure will complete (at some point in time), On restart the pending 072 * procedures are resumed and the once failed will be rolledback. 073 * 074 * The user can add procedures to the executor via submitProcedure(proc) 075 * check for the finished state via isFinished(procId) 076 * and get the result via getResult(procId) 077 */ 078@InterfaceAudience.Private 079public class ProcedureExecutor<TEnvironment> { 080 private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); 081 082 public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; 083 private static final boolean DEFAULT_CHECK_OWNER_SET = false; 084 085 public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = 086 "hbase.procedure.worker.keep.alive.time.msec"; 087 private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); 088 089 public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; 090 static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min 091 092 public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl"; 093 static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min 094 095 /** 096 * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to 097 * break PE having it fail at various junctures. When non-null, testing is set to an instance of 098 * the below internal {@link Testing} class with flags set for the particular test. 099 */ 100 volatile Testing testing = null; 101 102 /** 103 * Class with parameters describing how to fail/die when in testing-context. 104 */ 105 public static class Testing { 106 protected volatile boolean killIfHasParent = true; 107 protected volatile boolean killIfSuspended = false; 108 109 /** 110 * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is 111 * persisting all the state it needs to recover after a crash. 112 */ 113 protected volatile boolean killBeforeStoreUpdate = false; 114 protected volatile boolean toggleKillBeforeStoreUpdate = false; 115 116 /** 117 * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978 118 * is about a case where memory-state was being set after store to WAL where a crash could 119 * cause us to get stuck. This flag allows killing at what was a vulnerable time. 120 */ 121 protected volatile boolean killAfterStoreUpdate = false; 122 protected volatile boolean toggleKillAfterStoreUpdate = false; 123 124 protected boolean shouldKillBeforeStoreUpdate() { 125 final boolean kill = this.killBeforeStoreUpdate; 126 if (this.toggleKillBeforeStoreUpdate) { 127 this.killBeforeStoreUpdate = !kill; 128 LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); 129 } 130 return kill; 131 } 132 133 protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) { 134 if (isSuspended && !killIfSuspended) { 135 return false; 136 } 137 if (hasParent && !killIfHasParent) { 138 return false; 139 } 140 return shouldKillBeforeStoreUpdate(); 141 } 142 143 protected boolean shouldKillAfterStoreUpdate() { 144 final boolean kill = this.killAfterStoreUpdate; 145 if (this.toggleKillAfterStoreUpdate) { 146 this.killAfterStoreUpdate = !kill; 147 LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate); 148 } 149 return kill; 150 } 151 152 protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { 153 return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); 154 } 155 } 156 157 public interface ProcedureExecutorListener { 158 void procedureLoaded(long procId); 159 void procedureAdded(long procId); 160 void procedureFinished(long procId); 161 } 162 163 /** 164 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. 165 * Once a Root-Procedure completes (success or failure), the result will be added to this map. 166 * The user of ProcedureExecutor should call getResult(procId) to get the result. 167 */ 168 private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = 169 new ConcurrentHashMap<>(); 170 171 /** 172 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. 173 * The RootProcedureState contains the execution stack of the Root-Procedure, 174 * It is added to the map by submitProcedure() and removed on procedure completion. 175 */ 176 private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = 177 new ConcurrentHashMap<>(); 178 179 /** 180 * Helper map to lookup the live procedures by ID. 181 * This map contains every procedure. root-procedures and subprocedures. 182 */ 183 private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = 184 new ConcurrentHashMap<>(); 185 186 /** 187 * Helper map to lookup whether the procedure already issued from the same client. This map 188 * contains every root procedure. 189 */ 190 private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); 191 192 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = 193 new CopyOnWriteArrayList<>(); 194 195 private Configuration conf; 196 197 /** 198 * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing 199 * resource handling rather than observing in a #join is unexpected). 200 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 201 * (Should be ok). 202 */ 203 private ThreadGroup threadGroup; 204 205 /** 206 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 207 * resource handling rather than observing in a #join is unexpected). 208 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 209 * (Should be ok). 210 */ 211 private CopyOnWriteArrayList<WorkerThread> workerThreads; 212 213 /** 214 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 215 * resource handling rather than observing in a #join is unexpected). 216 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 217 * (Should be ok). 218 */ 219 private TimeoutExecutorThread<TEnvironment> timeoutExecutor; 220 221 /** 222 * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if 223 * there is no worker to assign meta, it will new worker thread for it, so it is very important. 224 * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore 225 * and so on, some tasks may execute for a long time so will block other tasks like 226 * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor. 227 */ 228 private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor; 229 230 private int corePoolSize; 231 private int maxPoolSize; 232 233 private volatile long keepAliveTime; 234 235 /** 236 * Scheduler/Queue that contains runnable procedures. 237 */ 238 private final ProcedureScheduler scheduler; 239 240 private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( 241 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); 242 243 private final AtomicLong lastProcId = new AtomicLong(-1); 244 private final AtomicLong workerId = new AtomicLong(0); 245 private final AtomicInteger activeExecutorCount = new AtomicInteger(0); 246 private final AtomicBoolean running = new AtomicBoolean(false); 247 private final TEnvironment environment; 248 private final ProcedureStore store; 249 250 private final boolean checkOwnerSet; 251 252 // To prevent concurrent execution of the same procedure. 253 // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the 254 // procedure is woken up before we finish the suspend which causes the same procedures to be 255 // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also 256 // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent 257 // execution of the same procedure. 258 private final IdLock procExecutionLock = new IdLock(); 259 260 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 261 final ProcedureStore store) { 262 this(conf, environment, store, new SimpleProcedureScheduler()); 263 } 264 265 private boolean isRootFinished(Procedure<?> proc) { 266 Procedure<?> rootProc = procedures.get(proc.getRootProcId()); 267 return rootProc == null || rootProc.isFinished(); 268 } 269 270 private void forceUpdateProcedure(long procId) throws IOException { 271 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); 272 try { 273 Procedure<TEnvironment> proc = procedures.get(procId); 274 if (proc != null) { 275 if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) { 276 LOG.debug("Procedure {} has already been finished and parent is succeeded," + 277 " skip force updating", proc); 278 return; 279 } 280 } else { 281 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 282 if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) { 283 LOG.debug("No pending procedure with id = {}, skip force updating.", procId); 284 return; 285 } 286 long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); 287 long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); 288 if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) { 289 LOG.debug("Procedure {} has already been finished and expired, skip force updating", 290 procId); 291 return; 292 } 293 proc = retainer.getProcedure(); 294 } 295 LOG.debug("Force update procedure {}", proc); 296 store.update(proc); 297 } finally { 298 procExecutionLock.releaseLockEntry(lockEntry); 299 } 300 } 301 302 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 303 final ProcedureStore store, final ProcedureScheduler scheduler) { 304 this.environment = environment; 305 this.scheduler = scheduler; 306 this.store = store; 307 this.conf = conf; 308 this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); 309 refreshConfiguration(conf); 310 store.registerListener(new ProcedureStoreListener() { 311 312 @Override 313 public void forceUpdate(long[] procIds) { 314 Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { 315 try { 316 forceUpdateProcedure(procId); 317 } catch (IOException e) { 318 LOG.warn("Failed to force update procedure with pid={}", procId); 319 } 320 })); 321 } 322 }); 323 } 324 325 private void load(final boolean abortOnCorruption) throws IOException { 326 Preconditions.checkArgument(completed.isEmpty(), "completed not empty"); 327 Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty"); 328 Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty"); 329 Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty"); 330 331 store.load(new ProcedureStore.ProcedureLoader() { 332 @Override 333 public void setMaxProcId(long maxProcId) { 334 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; 335 lastProcId.set(maxProcId); 336 } 337 338 @Override 339 public void load(ProcedureIterator procIter) throws IOException { 340 loadProcedures(procIter, abortOnCorruption); 341 } 342 343 @Override 344 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 345 int corruptedCount = 0; 346 while (procIter.hasNext()) { 347 Procedure<?> proc = procIter.next(); 348 LOG.error("Corrupt " + proc); 349 corruptedCount++; 350 } 351 if (abortOnCorruption && corruptedCount > 0) { 352 throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); 353 } 354 } 355 }); 356 } 357 358 private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { 359 proc.restoreLock(getEnvironment()); 360 restored.add(proc.getProcId()); 361 } 362 363 private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { 364 while (!stack.isEmpty()) { 365 restoreLock(stack.pop(), restored); 366 } 367 } 368 369 // Restore the locks for all the procedures. 370 // Notice that we need to restore the locks starting from the root proc, otherwise there will be 371 // problem that a sub procedure may hold the exclusive lock first and then we are stuck when 372 // calling the acquireLock method for the parent procedure. 373 // The algorithm is straight-forward: 374 // 1. Use a set to record the procedures which locks have already been restored. 375 // 2. Use a stack to store the hierarchy of the procedures 376 // 3. For all the procedure, we will first try to find its parent and push it into the stack, 377 // unless 378 // a. We have no parent, i.e, we are the root procedure 379 // b. The lock has already been restored(by checking the set introduced in #1) 380 // then we start to pop the stack and call acquireLock for each procedure. 381 // Notice that this should be done for all procedures, not only the ones in runnableList. 382 private void restoreLocks() { 383 Set<Long> restored = new HashSet<>(); 384 Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); 385 procedures.values().forEach(proc -> { 386 for (;;) { 387 if (restored.contains(proc.getProcId())) { 388 restoreLocks(stack, restored); 389 return; 390 } 391 if (!proc.hasParent()) { 392 restoreLock(proc, restored); 393 restoreLocks(stack, restored); 394 return; 395 } 396 stack.push(proc); 397 proc = procedures.get(proc.getParentProcId()); 398 } 399 }); 400 } 401 402 private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) 403 throws IOException { 404 // 1. Build the rollback stack 405 int runnableCount = 0; 406 int failedCount = 0; 407 int waitingCount = 0; 408 int waitingTimeoutCount = 0; 409 while (procIter.hasNext()) { 410 boolean finished = procIter.isNextFinished(); 411 @SuppressWarnings("unchecked") 412 Procedure<TEnvironment> proc = procIter.next(); 413 NonceKey nonceKey = proc.getNonceKey(); 414 long procId = proc.getProcId(); 415 416 if (finished) { 417 completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); 418 LOG.debug("Completed {}", proc); 419 } else { 420 if (!proc.hasParent()) { 421 assert !proc.isFinished() : "unexpected finished procedure"; 422 rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); 423 } 424 425 // add the procedure to the map 426 proc.beforeReplay(getEnvironment()); 427 procedures.put(proc.getProcId(), proc); 428 switch (proc.getState()) { 429 case RUNNABLE: 430 runnableCount++; 431 break; 432 case FAILED: 433 failedCount++; 434 break; 435 case WAITING: 436 waitingCount++; 437 break; 438 case WAITING_TIMEOUT: 439 waitingTimeoutCount++; 440 break; 441 default: 442 break; 443 } 444 } 445 446 if (nonceKey != null) { 447 nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map 448 } 449 } 450 451 // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will 452 // push it into the ProcedureScheduler directly to execute the rollback. But this does not work 453 // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove 454 // the queue from runQueue in scheduler, and then when a procedure which has lock access, for 455 // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler, 456 // we will add the queue back to let the workers poll from it. The assumption here is that, the 457 // procedure which has the xlock should have been polled out already, so when loading we can not 458 // add the procedure to scheduler first and then call acquireLock, since the procedure is still 459 // in the queue, and since we will remove the queue from runQueue, then no one can poll it out, 460 // then there is a dead lock 461 List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); 462 List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); 463 List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); 464 List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); 465 procIter.reset(); 466 while (procIter.hasNext()) { 467 if (procIter.isNextFinished()) { 468 procIter.skipNext(); 469 continue; 470 } 471 472 @SuppressWarnings("unchecked") 473 Procedure<TEnvironment> proc = procIter.next(); 474 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; 475 LOG.debug("Loading {}", proc); 476 Long rootProcId = getRootProcedureId(proc); 477 // The orphan procedures will be passed to handleCorrupted, so add an assert here 478 assert rootProcId != null; 479 480 if (proc.hasParent()) { 481 Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); 482 if (parent != null && !proc.isFinished()) { 483 parent.incChildrenLatch(); 484 } 485 } 486 487 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 488 procStack.loadStack(proc); 489 490 proc.setRootProcId(rootProcId); 491 switch (proc.getState()) { 492 case RUNNABLE: 493 runnableList.add(proc); 494 break; 495 case WAITING: 496 waitingList.add(proc); 497 break; 498 case WAITING_TIMEOUT: 499 waitingTimeoutList.add(proc); 500 break; 501 case FAILED: 502 failedList.add(proc); 503 break; 504 case ROLLEDBACK: 505 case INITIALIZING: 506 String msg = "Unexpected " + proc.getState() + " state for " + proc; 507 LOG.error(msg); 508 throw new UnsupportedOperationException(msg); 509 default: 510 break; 511 } 512 } 513 514 // 3. Check the waiting procedures to see if some of them can be added to runnable. 515 waitingList.forEach(proc -> { 516 if (!proc.hasChildren()) { 517 // Normally, WAITING procedures should be waken by its children. But, there is a case that, 518 // all the children are successful and before they can wake up their parent procedure, the 519 // master was killed. So, during recovering the procedures from ProcedureWal, its children 520 // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING 521 // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a 522 // exception will throw: 523 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 524 // "NOT RUNNABLE! " + procedure.toString()); 525 proc.setState(ProcedureState.RUNNABLE); 526 runnableList.add(proc); 527 } else { 528 proc.afterReplay(getEnvironment()); 529 } 530 }); 531 // 4. restore locks 532 restoreLocks(); 533 534 // 5. Push the procedures to the timeout executor 535 waitingTimeoutList.forEach(proc -> { 536 proc.afterReplay(getEnvironment()); 537 timeoutExecutor.add(proc); 538 }); 539 540 // 6. Push the procedure to the scheduler 541 failedList.forEach(scheduler::addBack); 542 runnableList.forEach(p -> { 543 p.afterReplay(getEnvironment()); 544 if (!p.hasParent()) { 545 sendProcedureLoadedNotification(p.getProcId()); 546 } 547 scheduler.addBack(p); 548 }); 549 // After all procedures put into the queue, signal the worker threads. 550 // Otherwise, there is a race condition. See HBASE-21364. 551 scheduler.signalAll(); 552 } 553 554 /** 555 * Initialize the procedure executor, but do not start workers. We will start them later. 556 * <p/> 557 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 558 * ensure a single executor, and start the procedure replay to resume and recover the previous 559 * pending and in-progress procedures. 560 * @param numThreads number of threads available for procedure execution. 561 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 562 * is found on replay. otherwise false. 563 */ 564 public void init(int numThreads, boolean abortOnCorruption) throws IOException { 565 // We have numThreads executor + one timer thread used for timing out 566 // procedures and triggering periodic procedures. 567 this.corePoolSize = numThreads; 568 this.maxPoolSize = 10 * numThreads; 569 LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", 570 corePoolSize, maxPoolSize); 571 572 this.threadGroup = new ThreadGroup("PEWorkerGroup"); 573 this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); 574 this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); 575 576 // Create the workers 577 workerId.set(0); 578 workerThreads = new CopyOnWriteArrayList<>(); 579 for (int i = 0; i < corePoolSize; ++i) { 580 workerThreads.add(new WorkerThread(threadGroup)); 581 } 582 583 long st, et; 584 585 // Acquire the store lease. 586 st = System.nanoTime(); 587 store.recoverLease(); 588 et = System.nanoTime(); 589 LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), 590 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 591 592 // start the procedure scheduler 593 scheduler.start(); 594 595 // TODO: Split in two steps. 596 // TODO: Handle corrupted procedures (currently just a warn) 597 // The first one will make sure that we have the latest id, 598 // so we can start the threads and accept new procedures. 599 // The second step will do the actual load of old procedures. 600 st = System.nanoTime(); 601 load(abortOnCorruption); 602 et = System.nanoTime(); 603 LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), 604 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 605 } 606 607 /** 608 * Start the workers. 609 */ 610 public void startWorkers() throws IOException { 611 if (!running.compareAndSet(false, true)) { 612 LOG.warn("Already running"); 613 return; 614 } 615 // Start the executors. Here we must have the lastProcId set. 616 LOG.trace("Start workers {}", workerThreads.size()); 617 timeoutExecutor.start(); 618 workerMonitorExecutor.start(); 619 for (WorkerThread worker: workerThreads) { 620 worker.start(); 621 } 622 623 // Internal chores 624 workerMonitorExecutor.add(new WorkerMonitor()); 625 626 // Add completed cleaner chore 627 addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, 628 nonceKeysToProcIdsMap)); 629 } 630 631 public void stop() { 632 if (!running.getAndSet(false)) { 633 return; 634 } 635 636 LOG.info("Stopping"); 637 scheduler.stop(); 638 timeoutExecutor.sendStopSignal(); 639 workerMonitorExecutor.sendStopSignal(); 640 } 641 642 @VisibleForTesting 643 public void join() { 644 assert !isRunning() : "expected not running"; 645 646 // stop the timeout executor 647 timeoutExecutor.awaitTermination(); 648 // stop the work monitor executor 649 workerMonitorExecutor.awaitTermination(); 650 651 // stop the worker threads 652 for (WorkerThread worker: workerThreads) { 653 worker.awaitTermination(); 654 } 655 656 // Destroy the Thread Group for the executors 657 // TODO: Fix. #join is not place to destroy resources. 658 try { 659 threadGroup.destroy(); 660 } catch (IllegalThreadStateException e) { 661 LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", 662 this.threadGroup, e.getMessage()); 663 // This dumps list of threads on STDOUT. 664 this.threadGroup.list(); 665 } 666 667 // reset the in-memory state for testing 668 completed.clear(); 669 rollbackStack.clear(); 670 procedures.clear(); 671 nonceKeysToProcIdsMap.clear(); 672 scheduler.clear(); 673 lastProcId.set(-1); 674 } 675 676 public void refreshConfiguration(final Configuration conf) { 677 this.conf = conf; 678 setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, 679 DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS); 680 } 681 682 // ========================================================================== 683 // Accessors 684 // ========================================================================== 685 public boolean isRunning() { 686 return running.get(); 687 } 688 689 /** 690 * @return the current number of worker threads. 691 */ 692 public int getWorkerThreadCount() { 693 return workerThreads.size(); 694 } 695 696 /** 697 * @return the core pool size settings. 698 */ 699 public int getCorePoolSize() { 700 return corePoolSize; 701 } 702 703 public int getActiveExecutorCount() { 704 return activeExecutorCount.get(); 705 } 706 707 public TEnvironment getEnvironment() { 708 return this.environment; 709 } 710 711 public ProcedureStore getStore() { 712 return this.store; 713 } 714 715 ProcedureScheduler getScheduler() { 716 return scheduler; 717 } 718 719 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { 720 this.keepAliveTime = timeUnit.toMillis(keepAliveTime); 721 this.scheduler.signalAll(); 722 } 723 724 public long getKeepAliveTime(final TimeUnit timeUnit) { 725 return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); 726 } 727 728 // ========================================================================== 729 // Submit/Remove Chores 730 // ========================================================================== 731 732 /** 733 * Add a chore procedure to the executor 734 * @param chore the chore to add 735 */ 736 public void addChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) { 737 if (chore == null) { 738 return; 739 } 740 chore.setState(ProcedureState.WAITING_TIMEOUT); 741 timeoutExecutor.add(chore); 742 } 743 744 /** 745 * Remove a chore procedure from the executor 746 * @param chore the chore to remove 747 * @return whether the chore is removed, or it will be removed later 748 */ 749 public boolean removeChore(@Nullable ProcedureInMemoryChore<TEnvironment> chore) { 750 if (chore == null) { 751 return true; 752 } 753 chore.setState(ProcedureState.SUCCESS); 754 return timeoutExecutor.remove(chore); 755 } 756 757 // ========================================================================== 758 // Nonce Procedure helpers 759 // ========================================================================== 760 /** 761 * Create a NonceKey from the specified nonceGroup and nonce. 762 * @param nonceGroup the group to use for the {@link NonceKey} 763 * @param nonce the nonce to use in the {@link NonceKey} 764 * @return the generated NonceKey 765 */ 766 public NonceKey createNonceKey(final long nonceGroup, final long nonce) { 767 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); 768 } 769 770 /** 771 * Register a nonce for a procedure that is going to be submitted. 772 * A procId will be reserved and on submitProcedure(), 773 * the procedure with the specified nonce will take the reserved ProcId. 774 * If someone already reserved the nonce, this method will return the procId reserved, 775 * otherwise an invalid procId will be returned. and the caller should procede 776 * and submit the procedure. 777 * 778 * @param nonceKey A unique identifier for this operation from the client or process. 779 * @return the procId associated with the nonce, if any otherwise an invalid procId. 780 */ 781 public long registerNonce(final NonceKey nonceKey) { 782 if (nonceKey == null) { 783 return -1; 784 } 785 786 // check if we have already a Reserved ID for the nonce 787 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); 788 if (oldProcId == null) { 789 // reserve a new Procedure ID, this will be associated with the nonce 790 // and the procedure submitted with the specified nonce will use this ID. 791 final long newProcId = nextProcId(); 792 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); 793 if (oldProcId == null) { 794 return -1; 795 } 796 } 797 798 // we found a registered nonce, but the procedure may not have been submitted yet. 799 // since the client expect the procedure to be submitted, spin here until it is. 800 final boolean traceEnabled = LOG.isTraceEnabled(); 801 while (isRunning() && 802 !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && 803 nonceKeysToProcIdsMap.containsKey(nonceKey)) { 804 if (traceEnabled) { 805 LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted"); 806 } 807 Threads.sleep(100); 808 } 809 return oldProcId.longValue(); 810 } 811 812 /** 813 * Remove the NonceKey if the procedure was not submitted to the executor. 814 * @param nonceKey A unique identifier for this operation from the client or process. 815 */ 816 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { 817 if (nonceKey == null) { 818 return; 819 } 820 821 final Long procId = nonceKeysToProcIdsMap.get(nonceKey); 822 if (procId == null) { 823 return; 824 } 825 826 // if the procedure was not submitted, remove the nonce 827 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { 828 nonceKeysToProcIdsMap.remove(nonceKey); 829 } 830 } 831 832 /** 833 * If the failure failed before submitting it, we may want to give back the 834 * same error to the requests with the same nonceKey. 835 * 836 * @param nonceKey A unique identifier for this operation from the client or process 837 * @param procName name of the procedure, used to inform the user 838 * @param procOwner name of the owner of the procedure, used to inform the user 839 * @param exception the failure to report to the user 840 */ 841 public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, 842 IOException exception) { 843 if (nonceKey == null) { 844 return; 845 } 846 847 Long procId = nonceKeysToProcIdsMap.get(nonceKey); 848 if (procId == null || completed.containsKey(procId)) { 849 return; 850 } 851 852 Procedure<TEnvironment> proc = 853 new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); 854 855 completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc)); 856 } 857 858 // ========================================================================== 859 // Submit/Abort Procedure 860 // ========================================================================== 861 /** 862 * Add a new root-procedure to the executor. 863 * @param proc the new procedure to execute. 864 * @return the procedure id, that can be used to monitor the operation 865 */ 866 public long submitProcedure(Procedure<TEnvironment> proc) { 867 return submitProcedure(proc, null); 868 } 869 870 /** 871 * Bypass a procedure. If the procedure is set to bypass, all the logic in 872 * execute/rollback will be ignored and it will return success, whatever. 873 * It is used to recover buggy stuck procedures, releasing the lock resources 874 * and letting other procedures run. Bypassing one procedure (and its ancestors will 875 * be bypassed automatically) may leave the cluster in a middle state, e.g. region 876 * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, 877 * the operators may have to do some clean up on hdfs or schedule some assign procedures 878 * to let region online. DO AT YOUR OWN RISK. 879 * <p> 880 * A procedure can be bypassed only if 881 * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT 882 * or it is a root procedure without any child. 883 * 2. No other worker thread is executing it 884 * 3. No child procedure has been submitted 885 * 886 * <p> 887 * If all the requirements are meet, the procedure and its ancestors will be 888 * bypassed and persisted to WAL. 889 * 890 * <p> 891 * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. 892 * TODO: What about WAITING_TIMEOUT? 893 * @param pids the procedure id 894 * @param lockWait time to wait lock 895 * @param force if force set to true, we will bypass the procedure even if it is executing. 896 * This is for procedures which can't break out during executing(due to bug, mostly) 897 * In this case, bypassing the procedure is not enough, since it is already stuck 898 * there. We need to restart the master after bypassing, and letting the problematic 899 * procedure to execute wth bypass=true, so in that condition, the procedure can be 900 * successfully bypassed. 901 * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! 902 * @return true if bypass success 903 * @throws IOException IOException 904 */ 905 public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, 906 boolean recursive) 907 throws IOException { 908 List<Boolean> result = new ArrayList<Boolean>(pids.size()); 909 for(long pid: pids) { 910 result.add(bypassProcedure(pid, lockWait, force, recursive)); 911 } 912 return result; 913 } 914 915 boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) 916 throws IOException { 917 Preconditions.checkArgument(lockWait > 0, "lockWait should be positive"); 918 final Procedure<TEnvironment> procedure = getProcedure(pid); 919 if (procedure == null) { 920 LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); 921 return false; 922 } 923 924 LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", 925 procedure, lockWait, override, recursive); 926 IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); 927 if (lockEntry == null && !override) { 928 LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", 929 lockWait, procedure, override); 930 return false; 931 } else if (lockEntry == null) { 932 LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", 933 lockWait, procedure, override); 934 } 935 try { 936 // check whether the procedure is already finished 937 if (procedure.isFinished()) { 938 LOG.debug("{} is already finished, skipping bypass", procedure); 939 return false; 940 } 941 942 if (procedure.hasChildren()) { 943 if (recursive) { 944 // EXPENSIVE. Checks each live procedure of which there could be many!!! 945 // Is there another way to get children of a procedure? 946 LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); 947 this.procedures.forEachValue(1 /*Single-threaded*/, 948 // Transformer 949 v -> v.getParentProcId() == procedure.getProcId()? v: null, 950 // Consumer 951 v -> { 952 try { 953 bypassProcedure(v.getProcId(), lockWait, override, recursive); 954 } catch (IOException e) { 955 LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); 956 } 957 }); 958 } else { 959 LOG.debug("{} has children, skipping bypass", procedure); 960 return false; 961 } 962 } 963 964 // If the procedure has no parent or no child, we are safe to bypass it in whatever state 965 if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE 966 && procedure.getState() != ProcedureState.WAITING 967 && procedure.getState() != ProcedureState.WAITING_TIMEOUT) { 968 LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " 969 + "(with no parent), {}", 970 procedure); 971 // Question: how is the bypass done here? 972 return false; 973 } 974 975 // Now, the procedure is not finished, and no one can execute it since we take the lock now 976 // And we can be sure that its ancestor is not running too, since their child has not 977 // finished yet 978 Procedure<TEnvironment> current = procedure; 979 while (current != null) { 980 LOG.debug("Bypassing {}", current); 981 current.bypass(getEnvironment()); 982 store.update(procedure); 983 long parentID = current.getParentProcId(); 984 current = getProcedure(parentID); 985 } 986 987 //wake up waiting procedure, already checked there is no child 988 if (procedure.getState() == ProcedureState.WAITING) { 989 procedure.setState(ProcedureState.RUNNABLE); 990 store.update(procedure); 991 } 992 993 // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. 994 // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE 995 if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 996 LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); 997 if (timeoutExecutor.remove(procedure)) { 998 LOG.debug("removed procedure {} from timeoutExecutor", procedure); 999 timeoutExecutor.executeTimedoutProcedure(procedure); 1000 } 1001 } else if (lockEntry != null) { 1002 scheduler.addFront(procedure); 1003 LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); 1004 } else { 1005 // If we don't have the lock, we can't re-submit the queue, 1006 // since it is already executing. To get rid of the stuck situation, we 1007 // need to restart the master. With the procedure set to bypass, the procedureExecutor 1008 // will bypass it and won't get stuck again. 1009 LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " 1010 + "skipping add to queue", 1011 procedure); 1012 } 1013 return true; 1014 1015 } finally { 1016 if (lockEntry != null) { 1017 procExecutionLock.releaseLockEntry(lockEntry); 1018 } 1019 } 1020 } 1021 1022 /** 1023 * Add a new root-procedure to the executor. 1024 * @param proc the new procedure to execute. 1025 * @param nonceKey the registered unique identifier for this operation from the client or process. 1026 * @return the procedure id, that can be used to monitor the operation 1027 */ 1028 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", 1029 justification = "FindBugs is blind to the check-for-null") 1030 public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { 1031 Preconditions.checkArgument(lastProcId.get() >= 0); 1032 1033 prepareProcedure(proc); 1034 1035 final Long currentProcId; 1036 if (nonceKey != null) { 1037 currentProcId = nonceKeysToProcIdsMap.get(nonceKey); 1038 Preconditions.checkArgument(currentProcId != null, 1039 "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); 1040 } else { 1041 currentProcId = nextProcId(); 1042 } 1043 1044 // Initialize the procedure 1045 proc.setNonceKey(nonceKey); 1046 proc.setProcId(currentProcId.longValue()); 1047 1048 // Commit the transaction 1049 store.insert(proc, null); 1050 LOG.debug("Stored {}", proc); 1051 1052 // Add the procedure to the executor 1053 return pushProcedure(proc); 1054 } 1055 1056 /** 1057 * Add a set of new root-procedure to the executor. 1058 * @param procs the new procedures to execute. 1059 */ 1060 // TODO: Do we need to take nonces here? 1061 public void submitProcedures(Procedure<TEnvironment>[] procs) { 1062 Preconditions.checkArgument(lastProcId.get() >= 0); 1063 if (procs == null || procs.length <= 0) { 1064 return; 1065 } 1066 1067 // Prepare procedure 1068 for (int i = 0; i < procs.length; ++i) { 1069 prepareProcedure(procs[i]).setProcId(nextProcId()); 1070 } 1071 1072 // Commit the transaction 1073 store.insert(procs); 1074 if (LOG.isDebugEnabled()) { 1075 LOG.debug("Stored " + Arrays.toString(procs)); 1076 } 1077 1078 // Add the procedure to the executor 1079 for (int i = 0; i < procs.length; ++i) { 1080 pushProcedure(procs[i]); 1081 } 1082 } 1083 1084 private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { 1085 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); 1086 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); 1087 if (this.checkOwnerSet) { 1088 Preconditions.checkArgument(proc.hasOwner(), "missing owner"); 1089 } 1090 return proc; 1091 } 1092 1093 private long pushProcedure(Procedure<TEnvironment> proc) { 1094 final long currentProcId = proc.getProcId(); 1095 1096 // Update metrics on start of a procedure 1097 proc.updateMetricsOnSubmit(getEnvironment()); 1098 1099 // Create the rollback stack for the procedure 1100 RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); 1101 rollbackStack.put(currentProcId, stack); 1102 1103 // Submit the new subprocedures 1104 assert !procedures.containsKey(currentProcId); 1105 procedures.put(currentProcId, proc); 1106 sendProcedureAddedNotification(currentProcId); 1107 scheduler.addBack(proc); 1108 return proc.getProcId(); 1109 } 1110 1111 /** 1112 * Send an abort notification the specified procedure. 1113 * Depending on the procedure implementation the abort can be considered or ignored. 1114 * @param procId the procedure to abort 1115 * @return true if the procedure exists and has received the abort, otherwise false. 1116 */ 1117 public boolean abort(long procId) { 1118 return abort(procId, true); 1119 } 1120 1121 /** 1122 * Send an abort notification to the specified procedure. 1123 * Depending on the procedure implementation, the abort can be considered or ignored. 1124 * @param procId the procedure to abort 1125 * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? 1126 * @return true if the procedure exists and has received the abort, otherwise false. 1127 */ 1128 public boolean abort(long procId, boolean mayInterruptIfRunning) { 1129 Procedure<TEnvironment> proc = procedures.get(procId); 1130 if (proc != null) { 1131 if (!mayInterruptIfRunning && proc.wasExecuted()) { 1132 return false; 1133 } 1134 return proc.abort(getEnvironment()); 1135 } 1136 return false; 1137 } 1138 1139 // ========================================================================== 1140 // Executor query helpers 1141 // ========================================================================== 1142 public Procedure<TEnvironment> getProcedure(final long procId) { 1143 return procedures.get(procId); 1144 } 1145 1146 public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { 1147 Procedure<TEnvironment> proc = getProcedure(procId); 1148 if (clazz.isInstance(proc)) { 1149 return clazz.cast(proc); 1150 } 1151 return null; 1152 } 1153 1154 public Procedure<TEnvironment> getResult(long procId) { 1155 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1156 if (retainer == null) { 1157 return null; 1158 } else { 1159 return retainer.getProcedure(); 1160 } 1161 } 1162 1163 /** 1164 * Return true if the procedure is finished. 1165 * The state may be "completed successfully" or "failed and rolledback". 1166 * Use getResult() to check the state or get the result data. 1167 * @param procId the ID of the procedure to check 1168 * @return true if the procedure execution is finished, otherwise false. 1169 */ 1170 public boolean isFinished(final long procId) { 1171 return !procedures.containsKey(procId); 1172 } 1173 1174 /** 1175 * Return true if the procedure is started. 1176 * @param procId the ID of the procedure to check 1177 * @return true if the procedure execution is started, otherwise false. 1178 */ 1179 public boolean isStarted(long procId) { 1180 Procedure<?> proc = procedures.get(procId); 1181 if (proc == null) { 1182 return completed.get(procId) != null; 1183 } 1184 return proc.wasExecuted(); 1185 } 1186 1187 /** 1188 * Mark the specified completed procedure, as ready to remove. 1189 * @param procId the ID of the procedure to remove 1190 */ 1191 public void removeResult(long procId) { 1192 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1193 if (retainer == null) { 1194 assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; 1195 LOG.debug("pid={} already removed by the cleaner.", procId); 1196 return; 1197 } 1198 1199 // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. 1200 retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); 1201 } 1202 1203 public Procedure<TEnvironment> getResultOrProcedure(long procId) { 1204 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1205 if (retainer == null) { 1206 return procedures.get(procId); 1207 } else { 1208 return retainer.getProcedure(); 1209 } 1210 } 1211 1212 /** 1213 * Check if the user is this procedure's owner 1214 * @param procId the target procedure 1215 * @param user the user 1216 * @return true if the user is the owner of the procedure, 1217 * false otherwise or the owner is unknown. 1218 */ 1219 public boolean isProcedureOwner(long procId, User user) { 1220 if (user == null) { 1221 return false; 1222 } 1223 final Procedure<TEnvironment> runningProc = procedures.get(procId); 1224 if (runningProc != null) { 1225 return runningProc.getOwner().equals(user.getShortName()); 1226 } 1227 1228 final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1229 if (retainer != null) { 1230 return retainer.getProcedure().getOwner().equals(user.getShortName()); 1231 } 1232 1233 // Procedure either does not exist or has already completed and got cleaned up. 1234 // At this time, we cannot check the owner of the procedure 1235 return false; 1236 } 1237 1238 /** 1239 * Should only be used when starting up, where the procedure workers have not been started. 1240 * <p/> 1241 * If the procedure works has been started, the return values maybe changed when you are 1242 * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as 1243 * it will do a copy, and also include the finished procedures. 1244 */ 1245 public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() { 1246 return procedures.values(); 1247 } 1248 1249 /** 1250 * Get procedures. 1251 * @return the procedures in a list 1252 */ 1253 public List<Procedure<TEnvironment>> getProcedures() { 1254 List<Procedure<TEnvironment>> procedureList = 1255 new ArrayList<>(procedures.size() + completed.size()); 1256 procedureList.addAll(procedures.values()); 1257 // Note: The procedure could show up twice in the list with different state, as 1258 // it could complete after we walk through procedures list and insert into 1259 // procedureList - it is ok, as we will use the information in the Procedure 1260 // to figure it out; to prevent this would increase the complexity of the logic. 1261 completed.values().stream().map(CompletedProcedureRetainer::getProcedure) 1262 .forEach(procedureList::add); 1263 return procedureList; 1264 } 1265 1266 // ========================================================================== 1267 // Listeners helpers 1268 // ========================================================================== 1269 public void registerListener(ProcedureExecutorListener listener) { 1270 this.listeners.add(listener); 1271 } 1272 1273 public boolean unregisterListener(ProcedureExecutorListener listener) { 1274 return this.listeners.remove(listener); 1275 } 1276 1277 private void sendProcedureLoadedNotification(final long procId) { 1278 if (!this.listeners.isEmpty()) { 1279 for (ProcedureExecutorListener listener: this.listeners) { 1280 try { 1281 listener.procedureLoaded(procId); 1282 } catch (Throwable e) { 1283 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1284 } 1285 } 1286 } 1287 } 1288 1289 private void sendProcedureAddedNotification(final long procId) { 1290 if (!this.listeners.isEmpty()) { 1291 for (ProcedureExecutorListener listener: this.listeners) { 1292 try { 1293 listener.procedureAdded(procId); 1294 } catch (Throwable e) { 1295 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1296 } 1297 } 1298 } 1299 } 1300 1301 private void sendProcedureFinishedNotification(final long procId) { 1302 if (!this.listeners.isEmpty()) { 1303 for (ProcedureExecutorListener listener: this.listeners) { 1304 try { 1305 listener.procedureFinished(procId); 1306 } catch (Throwable e) { 1307 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1308 } 1309 } 1310 } 1311 } 1312 1313 // ========================================================================== 1314 // Procedure IDs helpers 1315 // ========================================================================== 1316 private long nextProcId() { 1317 long procId = lastProcId.incrementAndGet(); 1318 if (procId < 0) { 1319 while (!lastProcId.compareAndSet(procId, 0)) { 1320 procId = lastProcId.get(); 1321 if (procId >= 0) { 1322 break; 1323 } 1324 } 1325 while (procedures.containsKey(procId)) { 1326 procId = lastProcId.incrementAndGet(); 1327 } 1328 } 1329 assert procId >= 0 : "Invalid procId " + procId; 1330 return procId; 1331 } 1332 1333 @VisibleForTesting 1334 protected long getLastProcId() { 1335 return lastProcId.get(); 1336 } 1337 1338 @VisibleForTesting 1339 public Set<Long> getActiveProcIds() { 1340 return procedures.keySet(); 1341 } 1342 1343 Long getRootProcedureId(Procedure<TEnvironment> proc) { 1344 return Procedure.getRootProcedureId(procedures, proc); 1345 } 1346 1347 // ========================================================================== 1348 // Executions 1349 // ========================================================================== 1350 private void executeProcedure(Procedure<TEnvironment> proc) { 1351 if (proc.isFinished()) { 1352 LOG.debug("{} is already finished, skipping execution", proc); 1353 return; 1354 } 1355 final Long rootProcId = getRootProcedureId(proc); 1356 if (rootProcId == null) { 1357 // The 'proc' was ready to run but the root procedure was rolledback 1358 LOG.warn("Rollback because parent is done/rolledback proc=" + proc); 1359 executeRollback(proc); 1360 return; 1361 } 1362 1363 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 1364 if (procStack == null) { 1365 LOG.warn("RootProcedureState is null for " + proc.getProcId()); 1366 return; 1367 } 1368 do { 1369 // Try to acquire the execution 1370 if (!procStack.acquire(proc)) { 1371 if (procStack.setRollback()) { 1372 // we have the 'rollback-lock' we can start rollingback 1373 switch (executeRollback(rootProcId, procStack)) { 1374 case LOCK_ACQUIRED: 1375 break; 1376 case LOCK_YIELD_WAIT: 1377 procStack.unsetRollback(); 1378 scheduler.yield(proc); 1379 break; 1380 case LOCK_EVENT_WAIT: 1381 LOG.info("LOCK_EVENT_WAIT rollback..." + proc); 1382 procStack.unsetRollback(); 1383 break; 1384 default: 1385 throw new UnsupportedOperationException(); 1386 } 1387 } else { 1388 // if we can't rollback means that some child is still running. 1389 // the rollback will be executed after all the children are done. 1390 // If the procedure was never executed, remove and mark it as rolledback. 1391 if (!proc.wasExecuted()) { 1392 switch (executeRollback(proc)) { 1393 case LOCK_ACQUIRED: 1394 break; 1395 case LOCK_YIELD_WAIT: 1396 scheduler.yield(proc); 1397 break; 1398 case LOCK_EVENT_WAIT: 1399 LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); 1400 break; 1401 default: 1402 throw new UnsupportedOperationException(); 1403 } 1404 } 1405 } 1406 break; 1407 } 1408 1409 // Execute the procedure 1410 assert proc.getState() == ProcedureState.RUNNABLE : proc; 1411 // Note that lock is NOT about concurrency but rather about ensuring 1412 // ownership of a procedure of an entity such as a region or table 1413 LockState lockState = acquireLock(proc); 1414 switch (lockState) { 1415 case LOCK_ACQUIRED: 1416 execProcedure(procStack, proc); 1417 break; 1418 case LOCK_YIELD_WAIT: 1419 LOG.info(lockState + " " + proc); 1420 scheduler.yield(proc); 1421 break; 1422 case LOCK_EVENT_WAIT: 1423 // Someone will wake us up when the lock is available 1424 LOG.debug(lockState + " " + proc); 1425 break; 1426 default: 1427 throw new UnsupportedOperationException(); 1428 } 1429 procStack.release(proc); 1430 1431 if (proc.isSuccess()) { 1432 // update metrics on finishing the procedure 1433 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); 1434 LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); 1435 // Finalize the procedure state 1436 if (proc.getProcId() == rootProcId) { 1437 procedureFinished(proc); 1438 } else { 1439 execCompletionCleanup(proc); 1440 } 1441 break; 1442 } 1443 } while (procStack.isFailed()); 1444 } 1445 1446 private LockState acquireLock(Procedure<TEnvironment> proc) { 1447 TEnvironment env = getEnvironment(); 1448 // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if 1449 // hasLock is true. 1450 if (proc.hasLock()) { 1451 return LockState.LOCK_ACQUIRED; 1452 } 1453 return proc.doAcquireLock(env, store); 1454 } 1455 1456 private void releaseLock(Procedure<TEnvironment> proc, boolean force) { 1457 TEnvironment env = getEnvironment(); 1458 // For how the framework works, we know that we will always have the lock 1459 // when we call releaseLock(), so we can avoid calling proc.hasLock() 1460 if (force || !proc.holdLock(env) || proc.isFinished()) { 1461 proc.doReleaseLock(env, store); 1462 } 1463 } 1464 1465 /** 1466 * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the 1467 * root-procedure will be visible as finished to user, and the result will be the fatal exception. 1468 */ 1469 private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { 1470 Procedure<TEnvironment> rootProc = procedures.get(rootProcId); 1471 RemoteProcedureException exception = rootProc.getException(); 1472 // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are 1473 // rolling back because the subprocedure does. Clarify. 1474 if (exception == null) { 1475 exception = procStack.getException(); 1476 rootProc.setFailure(exception); 1477 store.update(rootProc); 1478 } 1479 1480 List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); 1481 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; 1482 1483 int stackTail = subprocStack.size(); 1484 while (stackTail-- > 0) { 1485 Procedure<TEnvironment> proc = subprocStack.get(stackTail); 1486 IdLock.Entry lockEntry = null; 1487 // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need 1488 // this check, as the worker will hold the lock before executing a procedure. This is the only 1489 // place where we may hold two procedure execution locks, and there is a fence in the 1490 // RootProcedureState where we can make sure that only one worker can execute the rollback of 1491 // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to 1492 // prevent race between us and the force update thread. 1493 if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) { 1494 try { 1495 lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1496 } catch (IOException e) { 1497 // can only happen if interrupted, so not a big deal to propagate it 1498 throw new UncheckedIOException(e); 1499 } 1500 } 1501 try { 1502 // For the sub procedures which are successfully finished, we do not rollback them. 1503 // Typically, if we want to rollback a procedure, we first need to rollback it, and then 1504 // recursively rollback its ancestors. The state changes which are done by sub procedures 1505 // should be handled by parent procedures when rolling back. For example, when rolling back 1506 // a MergeTableProcedure, we will schedule new procedures to bring the offline regions 1507 // online, instead of rolling back the original procedures which offlined the regions(in 1508 // fact these procedures can not be rolled back...). 1509 if (proc.isSuccess()) { 1510 // Just do the cleanup work, without actually executing the rollback 1511 subprocStack.remove(stackTail); 1512 cleanupAfterRollbackOneStep(proc); 1513 continue; 1514 } 1515 LockState lockState = acquireLock(proc); 1516 if (lockState != LockState.LOCK_ACQUIRED) { 1517 // can't take a lock on the procedure, add the root-proc back on the 1518 // queue waiting for the lock availability 1519 return lockState; 1520 } 1521 1522 lockState = executeRollback(proc); 1523 releaseLock(proc, false); 1524 boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; 1525 abortRollback |= !isRunning() || !store.isRunning(); 1526 1527 // allows to kill the executor before something is stored to the wal. 1528 // useful to test the procedure recovery. 1529 if (abortRollback) { 1530 return lockState; 1531 } 1532 1533 subprocStack.remove(stackTail); 1534 1535 // if the procedure is kind enough to pass the slot to someone else, yield 1536 // if the proc is already finished, do not yield 1537 if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { 1538 return LockState.LOCK_YIELD_WAIT; 1539 } 1540 1541 if (proc != rootProc) { 1542 execCompletionCleanup(proc); 1543 } 1544 } finally { 1545 if (lockEntry != null) { 1546 procExecutionLock.releaseLockEntry(lockEntry); 1547 } 1548 } 1549 } 1550 1551 // Finalize the procedure state 1552 LOG.info("Rolled back {} exec-time={}", rootProc, 1553 StringUtils.humanTimeDiff(rootProc.elapsedTime())); 1554 procedureFinished(rootProc); 1555 return LockState.LOCK_ACQUIRED; 1556 } 1557 1558 private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { 1559 if (proc.removeStackIndex()) { 1560 if (!proc.isSuccess()) { 1561 proc.setState(ProcedureState.ROLLEDBACK); 1562 } 1563 1564 // update metrics on finishing the procedure (fail) 1565 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); 1566 1567 if (proc.hasParent()) { 1568 store.delete(proc.getProcId()); 1569 procedures.remove(proc.getProcId()); 1570 } else { 1571 final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); 1572 if (childProcIds != null) { 1573 store.delete(proc, childProcIds); 1574 } else { 1575 store.update(proc); 1576 } 1577 } 1578 } else { 1579 store.update(proc); 1580 } 1581 } 1582 1583 /** 1584 * Execute the rollback of the procedure step. 1585 * It updates the store with the new state (stack index) 1586 * or will remove completly the procedure in case it is a child. 1587 */ 1588 private LockState executeRollback(Procedure<TEnvironment> proc) { 1589 try { 1590 proc.doRollback(getEnvironment()); 1591 } catch (IOException e) { 1592 LOG.debug("Roll back attempt failed for {}", proc, e); 1593 return LockState.LOCK_YIELD_WAIT; 1594 } catch (InterruptedException e) { 1595 handleInterruptedException(proc, e); 1596 return LockState.LOCK_YIELD_WAIT; 1597 } catch (Throwable e) { 1598 // Catch NullPointerExceptions or similar errors... 1599 LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); 1600 } 1601 1602 // allows to kill the executor before something is stored to the wal. 1603 // useful to test the procedure recovery. 1604 if (testing != null && testing.shouldKillBeforeStoreUpdate()) { 1605 String msg = "TESTING: Kill before store update"; 1606 LOG.debug(msg); 1607 stop(); 1608 throw new RuntimeException(msg); 1609 } 1610 1611 cleanupAfterRollbackOneStep(proc); 1612 1613 return LockState.LOCK_ACQUIRED; 1614 } 1615 1616 private void yieldProcedure(Procedure<TEnvironment> proc) { 1617 releaseLock(proc, false); 1618 scheduler.yield(proc); 1619 } 1620 1621 /** 1622 * Executes <code>procedure</code> 1623 * <ul> 1624 * <li>Calls the doExecute() of the procedure 1625 * <li>If the procedure execution didn't fail (i.e. valid user input) 1626 * <ul> 1627 * <li>...and returned subprocedures 1628 * <ul><li>The subprocedures are initialized. 1629 * <li>The subprocedures are added to the store 1630 * <li>The subprocedures are added to the runnable queue 1631 * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete 1632 * </ul> 1633 * </li> 1634 * <li>...if there are no subprocedure 1635 * <ul><li>the procedure completed successfully 1636 * <li>if there is a parent (WAITING) 1637 * <li>the parent state will be set to RUNNABLE 1638 * </ul> 1639 * </li> 1640 * </ul> 1641 * </li> 1642 * <li>In case of failure 1643 * <ul> 1644 * <li>The store is updated with the new state</li> 1645 * <li>The executor (caller of this method) will start the rollback of the procedure</li> 1646 * </ul> 1647 * </li> 1648 * </ul> 1649 */ 1650 private void execProcedure(RootProcedureState<TEnvironment> procStack, 1651 Procedure<TEnvironment> procedure) { 1652 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 1653 "NOT RUNNABLE! " + procedure.toString()); 1654 1655 // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. 1656 // The exception is caught below and then we hurry to the exit without disturbing state. The 1657 // idea is that the processing of this procedure will be unsuspended later by an external event 1658 // such the report of a region open. 1659 boolean suspended = false; 1660 1661 // Whether to 're-' -execute; run through the loop again. 1662 boolean reExecute = false; 1663 1664 Procedure<TEnvironment>[] subprocs = null; 1665 do { 1666 reExecute = false; 1667 procedure.resetPersistence(); 1668 try { 1669 subprocs = procedure.doExecute(getEnvironment()); 1670 if (subprocs != null && subprocs.length == 0) { 1671 subprocs = null; 1672 } 1673 } catch (ProcedureSuspendedException e) { 1674 LOG.trace("Suspend {}", procedure); 1675 suspended = true; 1676 } catch (ProcedureYieldException e) { 1677 LOG.trace("Yield {}", procedure, e); 1678 yieldProcedure(procedure); 1679 return; 1680 } catch (InterruptedException e) { 1681 LOG.trace("Yield interrupt {}", procedure, e); 1682 handleInterruptedException(procedure, e); 1683 yieldProcedure(procedure); 1684 return; 1685 } catch (Throwable e) { 1686 // Catch NullPointerExceptions or similar errors... 1687 String msg = "CODE-BUG: Uncaught runtime exception: " + procedure; 1688 LOG.error(msg, e); 1689 procedure.setFailure(new RemoteProcedureException(msg, e)); 1690 } 1691 1692 if (!procedure.isFailed()) { 1693 if (subprocs != null) { 1694 if (subprocs.length == 1 && subprocs[0] == procedure) { 1695 // Procedure returned itself. Quick-shortcut for a state machine-like procedure; 1696 // i.e. we go around this loop again rather than go back out on the scheduler queue. 1697 subprocs = null; 1698 reExecute = true; 1699 LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); 1700 } else { 1701 // Yield the current procedure, and make the subprocedure runnable 1702 // subprocs may come back 'null'. 1703 subprocs = initializeChildren(procStack, procedure, subprocs); 1704 LOG.info("Initialized subprocedures=" + 1705 (subprocs == null? null: 1706 Stream.of(subprocs).map(e -> "{" + e.toString() + "}"). 1707 collect(Collectors.toList()).toString())); 1708 } 1709 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1710 LOG.trace("Added to timeoutExecutor {}", procedure); 1711 timeoutExecutor.add(procedure); 1712 } else if (!suspended) { 1713 // No subtask, so we are done 1714 procedure.setState(ProcedureState.SUCCESS); 1715 } 1716 } 1717 1718 // Add the procedure to the stack 1719 procStack.addRollbackStep(procedure); 1720 1721 // allows to kill the executor before something is stored to the wal. 1722 // useful to test the procedure recovery. 1723 if (testing != null && 1724 testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) { 1725 kill("TESTING: Kill BEFORE store update: " + procedure); 1726 } 1727 1728 // TODO: The code here doesn't check if store is running before persisting to the store as 1729 // it relies on the method call below to throw RuntimeException to wind up the stack and 1730 // executor thread to stop. The statement following the method call below seems to check if 1731 // store is not running, to prevent scheduling children procedures, re-execution or yield 1732 // of this procedure. This may need more scrutiny and subsequent cleanup in future 1733 // 1734 // Commit the transaction even if a suspend (state may have changed). Note this append 1735 // can take a bunch of time to complete. 1736 if (procedure.needPersistence()) { 1737 updateStoreOnExec(procStack, procedure, subprocs); 1738 } 1739 1740 // if the store is not running we are aborting 1741 if (!store.isRunning()) { 1742 return; 1743 } 1744 // if the procedure is kind enough to pass the slot to someone else, yield 1745 if (procedure.isRunnable() && !suspended && 1746 procedure.isYieldAfterExecutionStep(getEnvironment())) { 1747 yieldProcedure(procedure); 1748 return; 1749 } 1750 1751 assert (reExecute && subprocs == null) || !reExecute; 1752 } while (reExecute); 1753 1754 // Allows to kill the executor after something is stored to the WAL but before the below 1755 // state settings are done -- in particular the one on the end where we make parent 1756 // RUNNABLE again when its children are done; see countDownChildren. 1757 if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) { 1758 kill("TESTING: Kill AFTER store update: " + procedure); 1759 } 1760 1761 // Submit the new subprocedures 1762 if (subprocs != null && !procedure.isFailed()) { 1763 submitChildrenProcedures(subprocs); 1764 } 1765 1766 // we need to log the release lock operation before waking up the parent procedure, as there 1767 // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all 1768 // the sub procedures from store and cause problems... 1769 releaseLock(procedure, false); 1770 1771 // if the procedure is complete and has a parent, count down the children latch. 1772 // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. 1773 if (!suspended && procedure.isFinished() && procedure.hasParent()) { 1774 countDownChildren(procStack, procedure); 1775 } 1776 } 1777 1778 private void kill(String msg) { 1779 LOG.debug(msg); 1780 stop(); 1781 throw new RuntimeException(msg); 1782 } 1783 1784 private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, 1785 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1786 assert subprocs != null : "expected subprocedures"; 1787 final long rootProcId = getRootProcedureId(procedure); 1788 for (int i = 0; i < subprocs.length; ++i) { 1789 Procedure<TEnvironment> subproc = subprocs[i]; 1790 if (subproc == null) { 1791 String msg = "subproc[" + i + "] is null, aborting the procedure"; 1792 procedure.setFailure(new RemoteProcedureException(msg, 1793 new IllegalArgumentIOException(msg))); 1794 return null; 1795 } 1796 1797 assert subproc.getState() == ProcedureState.INITIALIZING : subproc; 1798 subproc.setParentProcId(procedure.getProcId()); 1799 subproc.setRootProcId(rootProcId); 1800 subproc.setProcId(nextProcId()); 1801 procStack.addSubProcedure(subproc); 1802 } 1803 1804 if (!procedure.isFailed()) { 1805 procedure.setChildrenLatch(subprocs.length); 1806 switch (procedure.getState()) { 1807 case RUNNABLE: 1808 procedure.setState(ProcedureState.WAITING); 1809 break; 1810 case WAITING_TIMEOUT: 1811 timeoutExecutor.add(procedure); 1812 break; 1813 default: 1814 break; 1815 } 1816 } 1817 return subprocs; 1818 } 1819 1820 private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { 1821 for (int i = 0; i < subprocs.length; ++i) { 1822 Procedure<TEnvironment> subproc = subprocs[i]; 1823 subproc.updateMetricsOnSubmit(getEnvironment()); 1824 assert !procedures.containsKey(subproc.getProcId()); 1825 procedures.put(subproc.getProcId(), subproc); 1826 scheduler.addFront(subproc); 1827 } 1828 } 1829 1830 private void countDownChildren(RootProcedureState<TEnvironment> procStack, 1831 Procedure<TEnvironment> procedure) { 1832 Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); 1833 if (parent == null) { 1834 assert procStack.isRollingback(); 1835 return; 1836 } 1837 1838 // If this procedure is the last child awake the parent procedure 1839 if (parent.tryRunnable()) { 1840 // If we succeeded in making the parent runnable -- i.e. all of its 1841 // children have completed, move parent to front of the queue. 1842 store.update(parent); 1843 scheduler.addFront(parent); 1844 LOG.info("Finished subprocedure pid={}, resume processing parent {}", 1845 procedure.getProcId(), parent); 1846 return; 1847 } 1848 } 1849 1850 private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, 1851 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1852 if (subprocs != null && !procedure.isFailed()) { 1853 if (LOG.isTraceEnabled()) { 1854 LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); 1855 } 1856 store.insert(procedure, subprocs); 1857 } else { 1858 LOG.trace("Store update {}", procedure); 1859 if (procedure.isFinished() && !procedure.hasParent()) { 1860 // remove child procedures 1861 final long[] childProcIds = procStack.getSubprocedureIds(); 1862 if (childProcIds != null) { 1863 store.delete(procedure, childProcIds); 1864 for (int i = 0; i < childProcIds.length; ++i) { 1865 procedures.remove(childProcIds[i]); 1866 } 1867 } else { 1868 store.update(procedure); 1869 } 1870 } else { 1871 store.update(procedure); 1872 } 1873 } 1874 } 1875 1876 private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { 1877 LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); 1878 // NOTE: We don't call Thread.currentThread().interrupt() 1879 // because otherwise all the subsequent calls e.g. Thread.sleep() will throw 1880 // the InterruptedException. If the master is going down, we will be notified 1881 // and the executor/store will be stopped. 1882 // (The interrupted procedure will be retried on the next run) 1883 } 1884 1885 private void execCompletionCleanup(Procedure<TEnvironment> proc) { 1886 final TEnvironment env = getEnvironment(); 1887 if (proc.hasLock()) { 1888 LOG.warn("Usually this should not happen, we will release the lock before if the procedure" + 1889 " is finished, even if the holdLock is true, arrive here means we have some holes where" + 1890 " we do not release the lock. And the releaseLock below may fail since the procedure may" + 1891 " have already been deleted from the procedure store."); 1892 releaseLock(proc, true); 1893 } 1894 try { 1895 proc.completionCleanup(env); 1896 } catch (Throwable e) { 1897 // Catch NullPointerExceptions or similar errors... 1898 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); 1899 } 1900 } 1901 1902 private void procedureFinished(Procedure<TEnvironment> proc) { 1903 // call the procedure completion cleanup handler 1904 execCompletionCleanup(proc); 1905 1906 CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); 1907 1908 // update the executor internal state maps 1909 if (!proc.shouldWaitClientAck(getEnvironment())) { 1910 retainer.setClientAckTime(0); 1911 } 1912 1913 completed.put(proc.getProcId(), retainer); 1914 rollbackStack.remove(proc.getProcId()); 1915 procedures.remove(proc.getProcId()); 1916 1917 // call the runnableSet completion cleanup handler 1918 try { 1919 scheduler.completionCleanup(proc); 1920 } catch (Throwable e) { 1921 // Catch NullPointerExceptions or similar errors... 1922 LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); 1923 } 1924 1925 // Notify the listeners 1926 sendProcedureFinishedNotification(proc.getProcId()); 1927 } 1928 1929 RootProcedureState<TEnvironment> getProcStack(long rootProcId) { 1930 return rollbackStack.get(rootProcId); 1931 } 1932 1933 @VisibleForTesting 1934 ProcedureScheduler getProcedureScheduler() { 1935 return scheduler; 1936 } 1937 1938 @VisibleForTesting 1939 int getCompletedSize() { 1940 return completed.size(); 1941 } 1942 1943 @VisibleForTesting 1944 public IdLock getProcExecutionLock() { 1945 return procExecutionLock; 1946 } 1947 1948 // ========================================================================== 1949 // Worker Thread 1950 // ========================================================================== 1951 private class WorkerThread extends StoppableThread { 1952 private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); 1953 private volatile Procedure<TEnvironment> activeProcedure; 1954 1955 public WorkerThread(ThreadGroup group) { 1956 this(group, "PEWorker-"); 1957 } 1958 1959 protected WorkerThread(ThreadGroup group, String prefix) { 1960 super(group, prefix + workerId.incrementAndGet()); 1961 setDaemon(true); 1962 } 1963 1964 @Override 1965 public void sendStopSignal() { 1966 scheduler.signalAll(); 1967 } 1968 @Override 1969 public void run() { 1970 long lastUpdate = EnvironmentEdgeManager.currentTime(); 1971 try { 1972 while (isRunning() && keepAlive(lastUpdate)) { 1973 @SuppressWarnings("unchecked") 1974 Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); 1975 if (proc == null) { 1976 continue; 1977 } 1978 this.activeProcedure = proc; 1979 int activeCount = activeExecutorCount.incrementAndGet(); 1980 int runningCount = store.setRunningProcedureCount(activeCount); 1981 LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), 1982 runningCount, activeCount); 1983 executionStartTime.set(EnvironmentEdgeManager.currentTime()); 1984 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1985 try { 1986 executeProcedure(proc); 1987 } catch (AssertionError e) { 1988 LOG.info("ASSERT pid=" + proc.getProcId(), e); 1989 throw e; 1990 } finally { 1991 procExecutionLock.releaseLockEntry(lockEntry); 1992 activeCount = activeExecutorCount.decrementAndGet(); 1993 runningCount = store.setRunningProcedureCount(activeCount); 1994 LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), 1995 runningCount, activeCount); 1996 this.activeProcedure = null; 1997 lastUpdate = EnvironmentEdgeManager.currentTime(); 1998 executionStartTime.set(Long.MAX_VALUE); 1999 } 2000 } 2001 } catch (Throwable t) { 2002 LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); 2003 } finally { 2004 LOG.trace("Worker terminated."); 2005 } 2006 workerThreads.remove(this); 2007 } 2008 2009 @Override 2010 public String toString() { 2011 Procedure<?> p = this.activeProcedure; 2012 return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); 2013 } 2014 2015 /** 2016 * @return the time since the current procedure is running 2017 */ 2018 public long getCurrentRunTime() { 2019 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); 2020 } 2021 2022 // core worker never timeout 2023 protected boolean keepAlive(long lastUpdate) { 2024 return true; 2025 } 2026 } 2027 2028 // A worker thread which can be added when core workers are stuck. Will timeout after 2029 // keepAliveTime if there is no procedure to run. 2030 private final class KeepAliveWorkerThread extends WorkerThread { 2031 public KeepAliveWorkerThread(ThreadGroup group) { 2032 super(group, "KeepAlivePEWorker-"); 2033 } 2034 2035 @Override 2036 protected boolean keepAlive(long lastUpdate) { 2037 return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; 2038 } 2039 } 2040 2041 // ---------------------------------------------------------------------------- 2042 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the 2043 // full set of procedures pending and completed to write a compacted 2044 // version of the log (in case is a log)? 2045 // In theory no, procedures are have a short life, so at some point the store 2046 // will have the tracker saying everything is in the last log. 2047 // ---------------------------------------------------------------------------- 2048 2049 private final class WorkerMonitor extends InlineChore { 2050 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = 2051 "hbase.procedure.worker.monitor.interval.msec"; 2052 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec 2053 2054 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = 2055 "hbase.procedure.worker.stuck.threshold.msec"; 2056 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec 2057 2058 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = 2059 "hbase.procedure.worker.add.stuck.percentage"; 2060 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck 2061 2062 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; 2063 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; 2064 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; 2065 2066 public WorkerMonitor() { 2067 refreshConfig(); 2068 } 2069 2070 @Override 2071 public void run() { 2072 final int stuckCount = checkForStuckWorkers(); 2073 checkThreadCount(stuckCount); 2074 2075 // refresh interval (poor man dynamic conf update) 2076 refreshConfig(); 2077 } 2078 2079 private int checkForStuckWorkers() { 2080 // check if any of the worker is stuck 2081 int stuckCount = 0; 2082 for (WorkerThread worker : workerThreads) { 2083 if (worker.getCurrentRunTime() < stuckThreshold) { 2084 continue; 2085 } 2086 2087 // WARN the worker is stuck 2088 stuckCount++; 2089 LOG.warn("Worker stuck {}, run time {}", worker, 2090 StringUtils.humanTimeDiff(worker.getCurrentRunTime())); 2091 } 2092 return stuckCount; 2093 } 2094 2095 private void checkThreadCount(final int stuckCount) { 2096 // nothing to do if there are no runnable tasks 2097 if (stuckCount < 1 || !scheduler.hasRunnables()) { 2098 return; 2099 } 2100 2101 // add a new thread if the worker stuck percentage exceed the threshold limit 2102 // and every handler is active. 2103 final float stuckPerc = ((float) stuckCount) / workerThreads.size(); 2104 // let's add new worker thread more aggressively, as they will timeout finally if there is no 2105 // work to do. 2106 if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { 2107 final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); 2108 workerThreads.add(worker); 2109 worker.start(); 2110 LOG.debug("Added new worker thread {}", worker); 2111 } 2112 } 2113 2114 private void refreshConfig() { 2115 addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, 2116 DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); 2117 timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, 2118 DEFAULT_WORKER_MONITOR_INTERVAL); 2119 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, 2120 DEFAULT_WORKER_STUCK_THRESHOLD); 2121 } 2122 2123 @Override 2124 public int getTimeoutInterval() { 2125 return timeoutInterval; 2126 } 2127 } 2128}