001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.UncheckedIOException; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Deque; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.Executor; 033import java.util.concurrent.Executors; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 043import org.apache.hadoop.hbase.log.HBaseMarkers; 044import org.apache.hadoop.hbase.procedure2.Procedure.LockState; 045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 048import org.apache.hadoop.hbase.procedure2.util.StringUtils; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.IdLock; 052import org.apache.hadoop.hbase.util.NonceKey; 053import org.apache.hadoop.hbase.util.Threads; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 063 064/** 065 * Thread Pool that executes the submitted procedures. 066 * The executor has a ProcedureStore associated. 067 * Each operation is logged and on restart the pending procedures are resumed. 068 * 069 * Unless the Procedure code throws an error (e.g. invalid user input) 070 * the procedure will complete (at some point in time), On restart the pending 071 * procedures are resumed and the once failed will be rolledback. 072 * 073 * The user can add procedures to the executor via submitProcedure(proc) 074 * check for the finished state via isFinished(procId) 075 * and get the result via getResult(procId) 076 */ 077@InterfaceAudience.Private 078public class ProcedureExecutor<TEnvironment> { 079 private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); 080 081 public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; 082 private static final boolean DEFAULT_CHECK_OWNER_SET = false; 083 084 public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = 085 "hbase.procedure.worker.keep.alive.time.msec"; 086 private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); 087 088 public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; 089 static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min 090 091 public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl"; 092 static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min 093 094 /** 095 * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to 096 * break PE having it fail at various junctures. When non-null, testing is set to an instance of 097 * the below internal {@link Testing} class with flags set for the particular test. 098 */ 099 volatile Testing testing = null; 100 101 /** 102 * Class with parameters describing how to fail/die when in testing-context. 103 */ 104 public static class Testing { 105 protected volatile boolean killIfHasParent = true; 106 protected volatile boolean killIfSuspended = false; 107 108 /** 109 * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is 110 * persisting all the state it needs to recover after a crash. 111 */ 112 protected volatile boolean killBeforeStoreUpdate = false; 113 protected volatile boolean toggleKillBeforeStoreUpdate = false; 114 115 /** 116 * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978 117 * is about a case where memory-state was being set after store to WAL where a crash could 118 * cause us to get stuck. This flag allows killing at what was a vulnerable time. 119 */ 120 protected volatile boolean killAfterStoreUpdate = false; 121 protected volatile boolean toggleKillAfterStoreUpdate = false; 122 123 protected boolean shouldKillBeforeStoreUpdate() { 124 final boolean kill = this.killBeforeStoreUpdate; 125 if (this.toggleKillBeforeStoreUpdate) { 126 this.killBeforeStoreUpdate = !kill; 127 LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); 128 } 129 return kill; 130 } 131 132 protected boolean shouldKillBeforeStoreUpdate(boolean isSuspended, boolean hasParent) { 133 if (isSuspended && !killIfSuspended) { 134 return false; 135 } 136 if (hasParent && !killIfHasParent) { 137 return false; 138 } 139 return shouldKillBeforeStoreUpdate(); 140 } 141 142 protected boolean shouldKillAfterStoreUpdate() { 143 final boolean kill = this.killAfterStoreUpdate; 144 if (this.toggleKillAfterStoreUpdate) { 145 this.killAfterStoreUpdate = !kill; 146 LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate); 147 } 148 return kill; 149 } 150 151 protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { 152 return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); 153 } 154 } 155 156 public interface ProcedureExecutorListener { 157 void procedureLoaded(long procId); 158 void procedureAdded(long procId); 159 void procedureFinished(long procId); 160 } 161 162 /** 163 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. 164 * Once a Root-Procedure completes (success or failure), the result will be added to this map. 165 * The user of ProcedureExecutor should call getResult(procId) to get the result. 166 */ 167 private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = 168 new ConcurrentHashMap<>(); 169 170 /** 171 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. 172 * The RootProcedureState contains the execution stack of the Root-Procedure, 173 * It is added to the map by submitProcedure() and removed on procedure completion. 174 */ 175 private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = 176 new ConcurrentHashMap<>(); 177 178 /** 179 * Helper map to lookup the live procedures by ID. 180 * This map contains every procedure. root-procedures and subprocedures. 181 */ 182 private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = 183 new ConcurrentHashMap<>(); 184 185 /** 186 * Helper map to lookup whether the procedure already issued from the same client. This map 187 * contains every root procedure. 188 */ 189 private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); 190 191 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = 192 new CopyOnWriteArrayList<>(); 193 194 private Configuration conf; 195 196 /** 197 * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing 198 * resource handling rather than observing in a #join is unexpected). 199 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 200 * (Should be ok). 201 */ 202 private ThreadGroup threadGroup; 203 204 /** 205 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 206 * resource handling rather than observing in a #join is unexpected). 207 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 208 * (Should be ok). 209 */ 210 private CopyOnWriteArrayList<WorkerThread> workerThreads; 211 212 /** 213 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 214 * resource handling rather than observing in a #join is unexpected). 215 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 216 * (Should be ok). 217 */ 218 private TimeoutExecutorThread<TEnvironment> timeoutExecutor; 219 220 /** 221 * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if 222 * there is no worker to assign meta, it will new worker thread for it, so it is very important. 223 * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore 224 * and so on, some tasks may execute for a long time so will block other tasks like 225 * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor. 226 */ 227 private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor; 228 229 private int corePoolSize; 230 private int maxPoolSize; 231 232 private volatile long keepAliveTime; 233 234 /** 235 * Scheduler/Queue that contains runnable procedures. 236 */ 237 private final ProcedureScheduler scheduler; 238 239 private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( 240 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); 241 242 private final AtomicLong lastProcId = new AtomicLong(-1); 243 private final AtomicLong workerId = new AtomicLong(0); 244 private final AtomicInteger activeExecutorCount = new AtomicInteger(0); 245 private final AtomicBoolean running = new AtomicBoolean(false); 246 private final TEnvironment environment; 247 private final ProcedureStore store; 248 249 private final boolean checkOwnerSet; 250 251 // To prevent concurrent execution of the same procedure. 252 // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the 253 // procedure is woken up before we finish the suspend which causes the same procedures to be 254 // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also 255 // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent 256 // execution of the same procedure. 257 private final IdLock procExecutionLock = new IdLock(); 258 259 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 260 final ProcedureStore store) { 261 this(conf, environment, store, new SimpleProcedureScheduler()); 262 } 263 264 private boolean isRootFinished(Procedure<?> proc) { 265 Procedure<?> rootProc = procedures.get(proc.getRootProcId()); 266 return rootProc == null || rootProc.isFinished(); 267 } 268 269 private void forceUpdateProcedure(long procId) throws IOException { 270 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); 271 try { 272 Procedure<TEnvironment> proc = procedures.get(procId); 273 if (proc != null) { 274 if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) { 275 LOG.debug("Procedure {} has already been finished and parent is succeeded," + 276 " skip force updating", proc); 277 return; 278 } 279 } else { 280 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 281 if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) { 282 LOG.debug("No pending procedure with id = {}, skip force updating.", procId); 283 return; 284 } 285 long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); 286 long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); 287 if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) { 288 LOG.debug("Procedure {} has already been finished and expired, skip force updating", 289 procId); 290 return; 291 } 292 proc = retainer.getProcedure(); 293 } 294 LOG.debug("Force update procedure {}", proc); 295 store.update(proc); 296 } finally { 297 procExecutionLock.releaseLockEntry(lockEntry); 298 } 299 } 300 301 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 302 final ProcedureStore store, final ProcedureScheduler scheduler) { 303 this.environment = environment; 304 this.scheduler = scheduler; 305 this.store = store; 306 this.conf = conf; 307 this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); 308 refreshConfiguration(conf); 309 store.registerListener(new ProcedureStoreListener() { 310 311 @Override 312 public void forceUpdate(long[] procIds) { 313 Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { 314 try { 315 forceUpdateProcedure(procId); 316 } catch (IOException e) { 317 LOG.warn("Failed to force update procedure with pid={}", procId); 318 } 319 })); 320 } 321 }); 322 } 323 324 private void load(final boolean abortOnCorruption) throws IOException { 325 Preconditions.checkArgument(completed.isEmpty(), "completed not empty"); 326 Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty"); 327 Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty"); 328 Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty"); 329 330 store.load(new ProcedureStore.ProcedureLoader() { 331 @Override 332 public void setMaxProcId(long maxProcId) { 333 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; 334 lastProcId.set(maxProcId); 335 } 336 337 @Override 338 public void load(ProcedureIterator procIter) throws IOException { 339 loadProcedures(procIter, abortOnCorruption); 340 } 341 342 @Override 343 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 344 int corruptedCount = 0; 345 while (procIter.hasNext()) { 346 Procedure<?> proc = procIter.next(); 347 LOG.error("Corrupt " + proc); 348 corruptedCount++; 349 } 350 if (abortOnCorruption && corruptedCount > 0) { 351 throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); 352 } 353 } 354 }); 355 } 356 357 private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { 358 proc.restoreLock(getEnvironment()); 359 restored.add(proc.getProcId()); 360 } 361 362 private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { 363 while (!stack.isEmpty()) { 364 restoreLock(stack.pop(), restored); 365 } 366 } 367 368 // Restore the locks for all the procedures. 369 // Notice that we need to restore the locks starting from the root proc, otherwise there will be 370 // problem that a sub procedure may hold the exclusive lock first and then we are stuck when 371 // calling the acquireLock method for the parent procedure. 372 // The algorithm is straight-forward: 373 // 1. Use a set to record the procedures which locks have already been restored. 374 // 2. Use a stack to store the hierarchy of the procedures 375 // 3. For all the procedure, we will first try to find its parent and push it into the stack, 376 // unless 377 // a. We have no parent, i.e, we are the root procedure 378 // b. The lock has already been restored(by checking the set introduced in #1) 379 // then we start to pop the stack and call acquireLock for each procedure. 380 // Notice that this should be done for all procedures, not only the ones in runnableList. 381 private void restoreLocks() { 382 Set<Long> restored = new HashSet<>(); 383 Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); 384 procedures.values().forEach(proc -> { 385 for (;;) { 386 if (restored.contains(proc.getProcId())) { 387 restoreLocks(stack, restored); 388 return; 389 } 390 if (!proc.hasParent()) { 391 restoreLock(proc, restored); 392 restoreLocks(stack, restored); 393 return; 394 } 395 stack.push(proc); 396 proc = procedures.get(proc.getParentProcId()); 397 } 398 }); 399 } 400 401 private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) 402 throws IOException { 403 // 1. Build the rollback stack 404 int runnableCount = 0; 405 int failedCount = 0; 406 int waitingCount = 0; 407 int waitingTimeoutCount = 0; 408 while (procIter.hasNext()) { 409 boolean finished = procIter.isNextFinished(); 410 @SuppressWarnings("unchecked") 411 Procedure<TEnvironment> proc = procIter.next(); 412 NonceKey nonceKey = proc.getNonceKey(); 413 long procId = proc.getProcId(); 414 415 if (finished) { 416 completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); 417 LOG.debug("Completed {}", proc); 418 } else { 419 if (!proc.hasParent()) { 420 assert !proc.isFinished() : "unexpected finished procedure"; 421 rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); 422 } 423 424 // add the procedure to the map 425 proc.beforeReplay(getEnvironment()); 426 procedures.put(proc.getProcId(), proc); 427 switch (proc.getState()) { 428 case RUNNABLE: 429 runnableCount++; 430 break; 431 case FAILED: 432 failedCount++; 433 break; 434 case WAITING: 435 waitingCount++; 436 break; 437 case WAITING_TIMEOUT: 438 waitingTimeoutCount++; 439 break; 440 default: 441 break; 442 } 443 } 444 445 if (nonceKey != null) { 446 nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map 447 } 448 } 449 450 // 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will 451 // push it into the ProcedureScheduler directly to execute the rollback. But this does not work 452 // after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove 453 // the queue from runQueue in scheduler, and then when a procedure which has lock access, for 454 // example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler, 455 // we will add the queue back to let the workers poll from it. The assumption here is that, the 456 // procedure which has the xlock should have been polled out already, so when loading we can not 457 // add the procedure to scheduler first and then call acquireLock, since the procedure is still 458 // in the queue, and since we will remove the queue from runQueue, then no one can poll it out, 459 // then there is a dead lock 460 List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); 461 List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); 462 List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); 463 List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); 464 procIter.reset(); 465 while (procIter.hasNext()) { 466 if (procIter.isNextFinished()) { 467 procIter.skipNext(); 468 continue; 469 } 470 471 @SuppressWarnings("unchecked") 472 Procedure<TEnvironment> proc = procIter.next(); 473 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; 474 LOG.debug("Loading {}", proc); 475 Long rootProcId = getRootProcedureId(proc); 476 // The orphan procedures will be passed to handleCorrupted, so add an assert here 477 assert rootProcId != null; 478 479 if (proc.hasParent()) { 480 Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); 481 if (parent != null && !proc.isFinished()) { 482 parent.incChildrenLatch(); 483 } 484 } 485 486 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 487 procStack.loadStack(proc); 488 489 proc.setRootProcId(rootProcId); 490 switch (proc.getState()) { 491 case RUNNABLE: 492 runnableList.add(proc); 493 break; 494 case WAITING: 495 waitingList.add(proc); 496 break; 497 case WAITING_TIMEOUT: 498 waitingTimeoutList.add(proc); 499 break; 500 case FAILED: 501 failedList.add(proc); 502 break; 503 case ROLLEDBACK: 504 case INITIALIZING: 505 String msg = "Unexpected " + proc.getState() + " state for " + proc; 506 LOG.error(msg); 507 throw new UnsupportedOperationException(msg); 508 default: 509 break; 510 } 511 } 512 513 // 3. Check the waiting procedures to see if some of them can be added to runnable. 514 waitingList.forEach(proc -> { 515 if (!proc.hasChildren()) { 516 // Normally, WAITING procedures should be waken by its children. But, there is a case that, 517 // all the children are successful and before they can wake up their parent procedure, the 518 // master was killed. So, during recovering the procedures from ProcedureWal, its children 519 // are not loaded because of their SUCCESS state. So we need to continue to run this WAITING 520 // procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a 521 // exception will throw: 522 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 523 // "NOT RUNNABLE! " + procedure.toString()); 524 proc.setState(ProcedureState.RUNNABLE); 525 runnableList.add(proc); 526 } else { 527 proc.afterReplay(getEnvironment()); 528 } 529 }); 530 // 4. restore locks 531 restoreLocks(); 532 533 // 5. Push the procedures to the timeout executor 534 waitingTimeoutList.forEach(proc -> { 535 proc.afterReplay(getEnvironment()); 536 timeoutExecutor.add(proc); 537 }); 538 539 // 6. Push the procedure to the scheduler 540 failedList.forEach(scheduler::addBack); 541 runnableList.forEach(p -> { 542 p.afterReplay(getEnvironment()); 543 if (!p.hasParent()) { 544 sendProcedureLoadedNotification(p.getProcId()); 545 } 546 scheduler.addBack(p); 547 }); 548 // After all procedures put into the queue, signal the worker threads. 549 // Otherwise, there is a race condition. See HBASE-21364. 550 scheduler.signalAll(); 551 } 552 553 /** 554 * Initialize the procedure executor, but do not start workers. We will start them later. 555 * <p/> 556 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 557 * ensure a single executor, and start the procedure replay to resume and recover the previous 558 * pending and in-progress procedures. 559 * @param numThreads number of threads available for procedure execution. 560 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 561 * is found on replay. otherwise false. 562 */ 563 public void init(int numThreads, boolean abortOnCorruption) throws IOException { 564 // We have numThreads executor + one timer thread used for timing out 565 // procedures and triggering periodic procedures. 566 this.corePoolSize = numThreads; 567 this.maxPoolSize = 10 * numThreads; 568 LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", 569 corePoolSize, maxPoolSize); 570 571 this.threadGroup = new ThreadGroup("PEWorkerGroup"); 572 this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); 573 this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); 574 575 // Create the workers 576 workerId.set(0); 577 workerThreads = new CopyOnWriteArrayList<>(); 578 for (int i = 0; i < corePoolSize; ++i) { 579 workerThreads.add(new WorkerThread(threadGroup)); 580 } 581 582 long st, et; 583 584 // Acquire the store lease. 585 st = System.nanoTime(); 586 store.recoverLease(); 587 et = System.nanoTime(); 588 LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), 589 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 590 591 // start the procedure scheduler 592 scheduler.start(); 593 594 // TODO: Split in two steps. 595 // TODO: Handle corrupted procedures (currently just a warn) 596 // The first one will make sure that we have the latest id, 597 // so we can start the threads and accept new procedures. 598 // The second step will do the actual load of old procedures. 599 st = System.nanoTime(); 600 load(abortOnCorruption); 601 et = System.nanoTime(); 602 LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), 603 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 604 } 605 606 /** 607 * Start the workers. 608 */ 609 public void startWorkers() throws IOException { 610 if (!running.compareAndSet(false, true)) { 611 LOG.warn("Already running"); 612 return; 613 } 614 // Start the executors. Here we must have the lastProcId set. 615 LOG.trace("Start workers {}", workerThreads.size()); 616 timeoutExecutor.start(); 617 workerMonitorExecutor.start(); 618 for (WorkerThread worker: workerThreads) { 619 worker.start(); 620 } 621 622 // Internal chores 623 workerMonitorExecutor.add(new WorkerMonitor()); 624 625 // Add completed cleaner chore 626 addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, 627 nonceKeysToProcIdsMap)); 628 } 629 630 public void stop() { 631 if (!running.getAndSet(false)) { 632 return; 633 } 634 635 LOG.info("Stopping"); 636 scheduler.stop(); 637 timeoutExecutor.sendStopSignal(); 638 workerMonitorExecutor.sendStopSignal(); 639 } 640 641 @VisibleForTesting 642 public void join() { 643 assert !isRunning() : "expected not running"; 644 645 // stop the timeout executor 646 timeoutExecutor.awaitTermination(); 647 // stop the work monitor executor 648 workerMonitorExecutor.awaitTermination(); 649 650 // stop the worker threads 651 for (WorkerThread worker: workerThreads) { 652 worker.awaitTermination(); 653 } 654 655 // Destroy the Thread Group for the executors 656 // TODO: Fix. #join is not place to destroy resources. 657 try { 658 threadGroup.destroy(); 659 } catch (IllegalThreadStateException e) { 660 LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", 661 this.threadGroup, e.getMessage()); 662 // This dumps list of threads on STDOUT. 663 this.threadGroup.list(); 664 } 665 666 // reset the in-memory state for testing 667 completed.clear(); 668 rollbackStack.clear(); 669 procedures.clear(); 670 nonceKeysToProcIdsMap.clear(); 671 scheduler.clear(); 672 lastProcId.set(-1); 673 } 674 675 public void refreshConfiguration(final Configuration conf) { 676 this.conf = conf; 677 setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, 678 DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS); 679 } 680 681 // ========================================================================== 682 // Accessors 683 // ========================================================================== 684 public boolean isRunning() { 685 return running.get(); 686 } 687 688 /** 689 * @return the current number of worker threads. 690 */ 691 public int getWorkerThreadCount() { 692 return workerThreads.size(); 693 } 694 695 /** 696 * @return the core pool size settings. 697 */ 698 public int getCorePoolSize() { 699 return corePoolSize; 700 } 701 702 public int getActiveExecutorCount() { 703 return activeExecutorCount.get(); 704 } 705 706 public TEnvironment getEnvironment() { 707 return this.environment; 708 } 709 710 public ProcedureStore getStore() { 711 return this.store; 712 } 713 714 ProcedureScheduler getScheduler() { 715 return scheduler; 716 } 717 718 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { 719 this.keepAliveTime = timeUnit.toMillis(keepAliveTime); 720 this.scheduler.signalAll(); 721 } 722 723 public long getKeepAliveTime(final TimeUnit timeUnit) { 724 return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); 725 } 726 727 // ========================================================================== 728 // Submit/Remove Chores 729 // ========================================================================== 730 731 /** 732 * Add a chore procedure to the executor 733 * @param chore the chore to add 734 */ 735 public void addChore(ProcedureInMemoryChore<TEnvironment> chore) { 736 chore.setState(ProcedureState.WAITING_TIMEOUT); 737 timeoutExecutor.add(chore); 738 } 739 740 /** 741 * Remove a chore procedure from the executor 742 * @param chore the chore to remove 743 * @return whether the chore is removed, or it will be removed later 744 */ 745 public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) { 746 chore.setState(ProcedureState.SUCCESS); 747 return timeoutExecutor.remove(chore); 748 } 749 750 // ========================================================================== 751 // Nonce Procedure helpers 752 // ========================================================================== 753 /** 754 * Create a NonceKey from the specified nonceGroup and nonce. 755 * @param nonceGroup the group to use for the {@link NonceKey} 756 * @param nonce the nonce to use in the {@link NonceKey} 757 * @return the generated NonceKey 758 */ 759 public NonceKey createNonceKey(final long nonceGroup, final long nonce) { 760 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); 761 } 762 763 /** 764 * Register a nonce for a procedure that is going to be submitted. 765 * A procId will be reserved and on submitProcedure(), 766 * the procedure with the specified nonce will take the reserved ProcId. 767 * If someone already reserved the nonce, this method will return the procId reserved, 768 * otherwise an invalid procId will be returned. and the caller should procede 769 * and submit the procedure. 770 * 771 * @param nonceKey A unique identifier for this operation from the client or process. 772 * @return the procId associated with the nonce, if any otherwise an invalid procId. 773 */ 774 public long registerNonce(final NonceKey nonceKey) { 775 if (nonceKey == null) { 776 return -1; 777 } 778 779 // check if we have already a Reserved ID for the nonce 780 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); 781 if (oldProcId == null) { 782 // reserve a new Procedure ID, this will be associated with the nonce 783 // and the procedure submitted with the specified nonce will use this ID. 784 final long newProcId = nextProcId(); 785 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); 786 if (oldProcId == null) { 787 return -1; 788 } 789 } 790 791 // we found a registered nonce, but the procedure may not have been submitted yet. 792 // since the client expect the procedure to be submitted, spin here until it is. 793 final boolean traceEnabled = LOG.isTraceEnabled(); 794 while (isRunning() && 795 !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && 796 nonceKeysToProcIdsMap.containsKey(nonceKey)) { 797 if (traceEnabled) { 798 LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted"); 799 } 800 Threads.sleep(100); 801 } 802 return oldProcId.longValue(); 803 } 804 805 /** 806 * Remove the NonceKey if the procedure was not submitted to the executor. 807 * @param nonceKey A unique identifier for this operation from the client or process. 808 */ 809 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { 810 if (nonceKey == null) { 811 return; 812 } 813 814 final Long procId = nonceKeysToProcIdsMap.get(nonceKey); 815 if (procId == null) { 816 return; 817 } 818 819 // if the procedure was not submitted, remove the nonce 820 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { 821 nonceKeysToProcIdsMap.remove(nonceKey); 822 } 823 } 824 825 /** 826 * If the failure failed before submitting it, we may want to give back the 827 * same error to the requests with the same nonceKey. 828 * 829 * @param nonceKey A unique identifier for this operation from the client or process 830 * @param procName name of the procedure, used to inform the user 831 * @param procOwner name of the owner of the procedure, used to inform the user 832 * @param exception the failure to report to the user 833 */ 834 public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, 835 IOException exception) { 836 if (nonceKey == null) { 837 return; 838 } 839 840 Long procId = nonceKeysToProcIdsMap.get(nonceKey); 841 if (procId == null || completed.containsKey(procId)) { 842 return; 843 } 844 845 Procedure<TEnvironment> proc = 846 new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); 847 848 completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc)); 849 } 850 851 // ========================================================================== 852 // Submit/Abort Procedure 853 // ========================================================================== 854 /** 855 * Add a new root-procedure to the executor. 856 * @param proc the new procedure to execute. 857 * @return the procedure id, that can be used to monitor the operation 858 */ 859 public long submitProcedure(Procedure<TEnvironment> proc) { 860 return submitProcedure(proc, null); 861 } 862 863 /** 864 * Bypass a procedure. If the procedure is set to bypass, all the logic in 865 * execute/rollback will be ignored and it will return success, whatever. 866 * It is used to recover buggy stuck procedures, releasing the lock resources 867 * and letting other procedures run. Bypassing one procedure (and its ancestors will 868 * be bypassed automatically) may leave the cluster in a middle state, e.g. region 869 * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, 870 * the operators may have to do some clean up on hdfs or schedule some assign procedures 871 * to let region online. DO AT YOUR OWN RISK. 872 * <p> 873 * A procedure can be bypassed only if 874 * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT 875 * or it is a root procedure without any child. 876 * 2. No other worker thread is executing it 877 * 3. No child procedure has been submitted 878 * 879 * <p> 880 * If all the requirements are meet, the procedure and its ancestors will be 881 * bypassed and persisted to WAL. 882 * 883 * <p> 884 * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. 885 * TODO: What about WAITING_TIMEOUT? 886 * @param pids the procedure id 887 * @param lockWait time to wait lock 888 * @param force if force set to true, we will bypass the procedure even if it is executing. 889 * This is for procedures which can't break out during executing(due to bug, mostly) 890 * In this case, bypassing the procedure is not enough, since it is already stuck 891 * there. We need to restart the master after bypassing, and letting the problematic 892 * procedure to execute wth bypass=true, so in that condition, the procedure can be 893 * successfully bypassed. 894 * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! 895 * @return true if bypass success 896 * @throws IOException IOException 897 */ 898 public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, 899 boolean recursive) 900 throws IOException { 901 List<Boolean> result = new ArrayList<Boolean>(pids.size()); 902 for(long pid: pids) { 903 result.add(bypassProcedure(pid, lockWait, force, recursive)); 904 } 905 return result; 906 } 907 908 boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) 909 throws IOException { 910 Preconditions.checkArgument(lockWait > 0, "lockWait should be positive"); 911 final Procedure<TEnvironment> procedure = getProcedure(pid); 912 if (procedure == null) { 913 LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); 914 return false; 915 } 916 917 LOG.debug("Begin bypass {} with lockWait={}, override={}, recursive={}", 918 procedure, lockWait, override, recursive); 919 IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); 920 if (lockEntry == null && !override) { 921 LOG.debug("Waited {} ms, but {} is still running, skipping bypass with force={}", 922 lockWait, procedure, override); 923 return false; 924 } else if (lockEntry == null) { 925 LOG.debug("Waited {} ms, but {} is still running, begin bypass with force={}", 926 lockWait, procedure, override); 927 } 928 try { 929 // check whether the procedure is already finished 930 if (procedure.isFinished()) { 931 LOG.debug("{} is already finished, skipping bypass", procedure); 932 return false; 933 } 934 935 if (procedure.hasChildren()) { 936 if (recursive) { 937 // EXPENSIVE. Checks each live procedure of which there could be many!!! 938 // Is there another way to get children of a procedure? 939 LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); 940 this.procedures.forEachValue(1 /*Single-threaded*/, 941 // Transformer 942 v -> v.getParentProcId() == procedure.getProcId()? v: null, 943 // Consumer 944 v -> { 945 try { 946 bypassProcedure(v.getProcId(), lockWait, override, recursive); 947 } catch (IOException e) { 948 LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); 949 } 950 }); 951 } else { 952 LOG.debug("{} has children, skipping bypass", procedure); 953 return false; 954 } 955 } 956 957 // If the procedure has no parent or no child, we are safe to bypass it in whatever state 958 if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE 959 && procedure.getState() != ProcedureState.WAITING 960 && procedure.getState() != ProcedureState.WAITING_TIMEOUT) { 961 LOG.debug("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " 962 + "(with no parent), {}", 963 procedure); 964 // Question: how is the bypass done here? 965 return false; 966 } 967 968 // Now, the procedure is not finished, and no one can execute it since we take the lock now 969 // And we can be sure that its ancestor is not running too, since their child has not 970 // finished yet 971 Procedure<TEnvironment> current = procedure; 972 while (current != null) { 973 LOG.debug("Bypassing {}", current); 974 current.bypass(getEnvironment()); 975 store.update(procedure); 976 long parentID = current.getParentProcId(); 977 current = getProcedure(parentID); 978 } 979 980 //wake up waiting procedure, already checked there is no child 981 if (procedure.getState() == ProcedureState.WAITING) { 982 procedure.setState(ProcedureState.RUNNABLE); 983 store.update(procedure); 984 } 985 986 // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. 987 // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE 988 if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 989 LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); 990 if (timeoutExecutor.remove(procedure)) { 991 LOG.debug("removed procedure {} from timeoutExecutor", procedure); 992 timeoutExecutor.executeTimedoutProcedure(procedure); 993 } 994 } else if (lockEntry != null) { 995 scheduler.addFront(procedure); 996 LOG.debug("Bypassing {} and its ancestors successfully, adding to queue", procedure); 997 } else { 998 // If we don't have the lock, we can't re-submit the queue, 999 // since it is already executing. To get rid of the stuck situation, we 1000 // need to restart the master. With the procedure set to bypass, the procedureExecutor 1001 // will bypass it and won't get stuck again. 1002 LOG.debug("Bypassing {} and its ancestors successfully, but since it is already running, " 1003 + "skipping add to queue", 1004 procedure); 1005 } 1006 return true; 1007 1008 } finally { 1009 if (lockEntry != null) { 1010 procExecutionLock.releaseLockEntry(lockEntry); 1011 } 1012 } 1013 } 1014 1015 /** 1016 * Add a new root-procedure to the executor. 1017 * @param proc the new procedure to execute. 1018 * @param nonceKey the registered unique identifier for this operation from the client or process. 1019 * @return the procedure id, that can be used to monitor the operation 1020 */ 1021 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", 1022 justification = "FindBugs is blind to the check-for-null") 1023 public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { 1024 Preconditions.checkArgument(lastProcId.get() >= 0); 1025 1026 prepareProcedure(proc); 1027 1028 final Long currentProcId; 1029 if (nonceKey != null) { 1030 currentProcId = nonceKeysToProcIdsMap.get(nonceKey); 1031 Preconditions.checkArgument(currentProcId != null, 1032 "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); 1033 } else { 1034 currentProcId = nextProcId(); 1035 } 1036 1037 // Initialize the procedure 1038 proc.setNonceKey(nonceKey); 1039 proc.setProcId(currentProcId.longValue()); 1040 1041 // Commit the transaction 1042 store.insert(proc, null); 1043 LOG.debug("Stored {}", proc); 1044 1045 // Add the procedure to the executor 1046 return pushProcedure(proc); 1047 } 1048 1049 /** 1050 * Add a set of new root-procedure to the executor. 1051 * @param procs the new procedures to execute. 1052 */ 1053 // TODO: Do we need to take nonces here? 1054 public void submitProcedures(Procedure<TEnvironment>[] procs) { 1055 Preconditions.checkArgument(lastProcId.get() >= 0); 1056 if (procs == null || procs.length <= 0) { 1057 return; 1058 } 1059 1060 // Prepare procedure 1061 for (int i = 0; i < procs.length; ++i) { 1062 prepareProcedure(procs[i]).setProcId(nextProcId()); 1063 } 1064 1065 // Commit the transaction 1066 store.insert(procs); 1067 if (LOG.isDebugEnabled()) { 1068 LOG.debug("Stored " + Arrays.toString(procs)); 1069 } 1070 1071 // Add the procedure to the executor 1072 for (int i = 0; i < procs.length; ++i) { 1073 pushProcedure(procs[i]); 1074 } 1075 } 1076 1077 private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { 1078 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); 1079 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); 1080 if (this.checkOwnerSet) { 1081 Preconditions.checkArgument(proc.hasOwner(), "missing owner"); 1082 } 1083 return proc; 1084 } 1085 1086 private long pushProcedure(Procedure<TEnvironment> proc) { 1087 final long currentProcId = proc.getProcId(); 1088 1089 // Update metrics on start of a procedure 1090 proc.updateMetricsOnSubmit(getEnvironment()); 1091 1092 // Create the rollback stack for the procedure 1093 RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); 1094 rollbackStack.put(currentProcId, stack); 1095 1096 // Submit the new subprocedures 1097 assert !procedures.containsKey(currentProcId); 1098 procedures.put(currentProcId, proc); 1099 sendProcedureAddedNotification(currentProcId); 1100 scheduler.addBack(proc); 1101 return proc.getProcId(); 1102 } 1103 1104 /** 1105 * Send an abort notification the specified procedure. 1106 * Depending on the procedure implementation the abort can be considered or ignored. 1107 * @param procId the procedure to abort 1108 * @return true if the procedure exists and has received the abort, otherwise false. 1109 */ 1110 public boolean abort(long procId) { 1111 return abort(procId, true); 1112 } 1113 1114 /** 1115 * Send an abort notification to the specified procedure. 1116 * Depending on the procedure implementation, the abort can be considered or ignored. 1117 * @param procId the procedure to abort 1118 * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? 1119 * @return true if the procedure exists and has received the abort, otherwise false. 1120 */ 1121 public boolean abort(long procId, boolean mayInterruptIfRunning) { 1122 Procedure<TEnvironment> proc = procedures.get(procId); 1123 if (proc != null) { 1124 if (!mayInterruptIfRunning && proc.wasExecuted()) { 1125 return false; 1126 } 1127 return proc.abort(getEnvironment()); 1128 } 1129 return false; 1130 } 1131 1132 // ========================================================================== 1133 // Executor query helpers 1134 // ========================================================================== 1135 public Procedure<TEnvironment> getProcedure(final long procId) { 1136 return procedures.get(procId); 1137 } 1138 1139 public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { 1140 Procedure<TEnvironment> proc = getProcedure(procId); 1141 if (clazz.isInstance(proc)) { 1142 return clazz.cast(proc); 1143 } 1144 return null; 1145 } 1146 1147 public Procedure<TEnvironment> getResult(long procId) { 1148 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1149 if (retainer == null) { 1150 return null; 1151 } else { 1152 return retainer.getProcedure(); 1153 } 1154 } 1155 1156 /** 1157 * Return true if the procedure is finished. 1158 * The state may be "completed successfully" or "failed and rolledback". 1159 * Use getResult() to check the state or get the result data. 1160 * @param procId the ID of the procedure to check 1161 * @return true if the procedure execution is finished, otherwise false. 1162 */ 1163 public boolean isFinished(final long procId) { 1164 return !procedures.containsKey(procId); 1165 } 1166 1167 /** 1168 * Return true if the procedure is started. 1169 * @param procId the ID of the procedure to check 1170 * @return true if the procedure execution is started, otherwise false. 1171 */ 1172 public boolean isStarted(long procId) { 1173 Procedure<?> proc = procedures.get(procId); 1174 if (proc == null) { 1175 return completed.get(procId) != null; 1176 } 1177 return proc.wasExecuted(); 1178 } 1179 1180 /** 1181 * Mark the specified completed procedure, as ready to remove. 1182 * @param procId the ID of the procedure to remove 1183 */ 1184 public void removeResult(long procId) { 1185 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1186 if (retainer == null) { 1187 assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; 1188 LOG.debug("pid={} already removed by the cleaner.", procId); 1189 return; 1190 } 1191 1192 // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. 1193 retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); 1194 } 1195 1196 public Procedure<TEnvironment> getResultOrProcedure(long procId) { 1197 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1198 if (retainer == null) { 1199 return procedures.get(procId); 1200 } else { 1201 return retainer.getProcedure(); 1202 } 1203 } 1204 1205 /** 1206 * Check if the user is this procedure's owner 1207 * @param procId the target procedure 1208 * @param user the user 1209 * @return true if the user is the owner of the procedure, 1210 * false otherwise or the owner is unknown. 1211 */ 1212 public boolean isProcedureOwner(long procId, User user) { 1213 if (user == null) { 1214 return false; 1215 } 1216 final Procedure<TEnvironment> runningProc = procedures.get(procId); 1217 if (runningProc != null) { 1218 return runningProc.getOwner().equals(user.getShortName()); 1219 } 1220 1221 final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1222 if (retainer != null) { 1223 return retainer.getProcedure().getOwner().equals(user.getShortName()); 1224 } 1225 1226 // Procedure either does not exist or has already completed and got cleaned up. 1227 // At this time, we cannot check the owner of the procedure 1228 return false; 1229 } 1230 1231 /** 1232 * Should only be used when starting up, where the procedure workers have not been started. 1233 * <p/> 1234 * If the procedure works has been started, the return values maybe changed when you are 1235 * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as 1236 * it will do a copy, and also include the finished procedures. 1237 */ 1238 public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() { 1239 return procedures.values(); 1240 } 1241 1242 /** 1243 * Get procedures. 1244 * @return the procedures in a list 1245 */ 1246 public List<Procedure<TEnvironment>> getProcedures() { 1247 List<Procedure<TEnvironment>> procedureList = 1248 new ArrayList<>(procedures.size() + completed.size()); 1249 procedureList.addAll(procedures.values()); 1250 // Note: The procedure could show up twice in the list with different state, as 1251 // it could complete after we walk through procedures list and insert into 1252 // procedureList - it is ok, as we will use the information in the Procedure 1253 // to figure it out; to prevent this would increase the complexity of the logic. 1254 completed.values().stream().map(CompletedProcedureRetainer::getProcedure) 1255 .forEach(procedureList::add); 1256 return procedureList; 1257 } 1258 1259 // ========================================================================== 1260 // Listeners helpers 1261 // ========================================================================== 1262 public void registerListener(ProcedureExecutorListener listener) { 1263 this.listeners.add(listener); 1264 } 1265 1266 public boolean unregisterListener(ProcedureExecutorListener listener) { 1267 return this.listeners.remove(listener); 1268 } 1269 1270 private void sendProcedureLoadedNotification(final long procId) { 1271 if (!this.listeners.isEmpty()) { 1272 for (ProcedureExecutorListener listener: this.listeners) { 1273 try { 1274 listener.procedureLoaded(procId); 1275 } catch (Throwable e) { 1276 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1277 } 1278 } 1279 } 1280 } 1281 1282 private void sendProcedureAddedNotification(final long procId) { 1283 if (!this.listeners.isEmpty()) { 1284 for (ProcedureExecutorListener listener: this.listeners) { 1285 try { 1286 listener.procedureAdded(procId); 1287 } catch (Throwable e) { 1288 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1289 } 1290 } 1291 } 1292 } 1293 1294 private void sendProcedureFinishedNotification(final long procId) { 1295 if (!this.listeners.isEmpty()) { 1296 for (ProcedureExecutorListener listener: this.listeners) { 1297 try { 1298 listener.procedureFinished(procId); 1299 } catch (Throwable e) { 1300 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1301 } 1302 } 1303 } 1304 } 1305 1306 // ========================================================================== 1307 // Procedure IDs helpers 1308 // ========================================================================== 1309 private long nextProcId() { 1310 long procId = lastProcId.incrementAndGet(); 1311 if (procId < 0) { 1312 while (!lastProcId.compareAndSet(procId, 0)) { 1313 procId = lastProcId.get(); 1314 if (procId >= 0) { 1315 break; 1316 } 1317 } 1318 while (procedures.containsKey(procId)) { 1319 procId = lastProcId.incrementAndGet(); 1320 } 1321 } 1322 assert procId >= 0 : "Invalid procId " + procId; 1323 return procId; 1324 } 1325 1326 @VisibleForTesting 1327 protected long getLastProcId() { 1328 return lastProcId.get(); 1329 } 1330 1331 @VisibleForTesting 1332 public Set<Long> getActiveProcIds() { 1333 return procedures.keySet(); 1334 } 1335 1336 Long getRootProcedureId(Procedure<TEnvironment> proc) { 1337 return Procedure.getRootProcedureId(procedures, proc); 1338 } 1339 1340 // ========================================================================== 1341 // Executions 1342 // ========================================================================== 1343 private void executeProcedure(Procedure<TEnvironment> proc) { 1344 if (proc.isFinished()) { 1345 LOG.debug("{} is already finished, skipping execution", proc); 1346 return; 1347 } 1348 final Long rootProcId = getRootProcedureId(proc); 1349 if (rootProcId == null) { 1350 // The 'proc' was ready to run but the root procedure was rolledback 1351 LOG.warn("Rollback because parent is done/rolledback proc=" + proc); 1352 executeRollback(proc); 1353 return; 1354 } 1355 1356 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 1357 if (procStack == null) { 1358 LOG.warn("RootProcedureState is null for " + proc.getProcId()); 1359 return; 1360 } 1361 do { 1362 // Try to acquire the execution 1363 if (!procStack.acquire(proc)) { 1364 if (procStack.setRollback()) { 1365 // we have the 'rollback-lock' we can start rollingback 1366 switch (executeRollback(rootProcId, procStack)) { 1367 case LOCK_ACQUIRED: 1368 break; 1369 case LOCK_YIELD_WAIT: 1370 procStack.unsetRollback(); 1371 scheduler.yield(proc); 1372 break; 1373 case LOCK_EVENT_WAIT: 1374 LOG.info("LOCK_EVENT_WAIT rollback..." + proc); 1375 procStack.unsetRollback(); 1376 break; 1377 default: 1378 throw new UnsupportedOperationException(); 1379 } 1380 } else { 1381 // if we can't rollback means that some child is still running. 1382 // the rollback will be executed after all the children are done. 1383 // If the procedure was never executed, remove and mark it as rolledback. 1384 if (!proc.wasExecuted()) { 1385 switch (executeRollback(proc)) { 1386 case LOCK_ACQUIRED: 1387 break; 1388 case LOCK_YIELD_WAIT: 1389 scheduler.yield(proc); 1390 break; 1391 case LOCK_EVENT_WAIT: 1392 LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); 1393 break; 1394 default: 1395 throw new UnsupportedOperationException(); 1396 } 1397 } 1398 } 1399 break; 1400 } 1401 1402 // Execute the procedure 1403 assert proc.getState() == ProcedureState.RUNNABLE : proc; 1404 // Note that lock is NOT about concurrency but rather about ensuring 1405 // ownership of a procedure of an entity such as a region or table 1406 LockState lockState = acquireLock(proc); 1407 switch (lockState) { 1408 case LOCK_ACQUIRED: 1409 execProcedure(procStack, proc); 1410 break; 1411 case LOCK_YIELD_WAIT: 1412 LOG.info(lockState + " " + proc); 1413 scheduler.yield(proc); 1414 break; 1415 case LOCK_EVENT_WAIT: 1416 // Someone will wake us up when the lock is available 1417 LOG.debug(lockState + " " + proc); 1418 break; 1419 default: 1420 throw new UnsupportedOperationException(); 1421 } 1422 procStack.release(proc); 1423 1424 if (proc.isSuccess()) { 1425 // update metrics on finishing the procedure 1426 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); 1427 LOG.info("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); 1428 // Finalize the procedure state 1429 if (proc.getProcId() == rootProcId) { 1430 procedureFinished(proc); 1431 } else { 1432 execCompletionCleanup(proc); 1433 } 1434 break; 1435 } 1436 } while (procStack.isFailed()); 1437 } 1438 1439 private LockState acquireLock(Procedure<TEnvironment> proc) { 1440 TEnvironment env = getEnvironment(); 1441 // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if 1442 // hasLock is true. 1443 if (proc.hasLock()) { 1444 return LockState.LOCK_ACQUIRED; 1445 } 1446 return proc.doAcquireLock(env, store); 1447 } 1448 1449 private void releaseLock(Procedure<TEnvironment> proc, boolean force) { 1450 TEnvironment env = getEnvironment(); 1451 // For how the framework works, we know that we will always have the lock 1452 // when we call releaseLock(), so we can avoid calling proc.hasLock() 1453 if (force || !proc.holdLock(env) || proc.isFinished()) { 1454 proc.doReleaseLock(env, store); 1455 } 1456 } 1457 1458 /** 1459 * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the 1460 * root-procedure will be visible as finished to user, and the result will be the fatal exception. 1461 */ 1462 private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { 1463 Procedure<TEnvironment> rootProc = procedures.get(rootProcId); 1464 RemoteProcedureException exception = rootProc.getException(); 1465 // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are 1466 // rolling back because the subprocedure does. Clarify. 1467 if (exception == null) { 1468 exception = procStack.getException(); 1469 rootProc.setFailure(exception); 1470 store.update(rootProc); 1471 } 1472 1473 List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); 1474 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; 1475 1476 int stackTail = subprocStack.size(); 1477 while (stackTail-- > 0) { 1478 Procedure<TEnvironment> proc = subprocStack.get(stackTail); 1479 IdLock.Entry lockEntry = null; 1480 // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need 1481 // this check, as the worker will hold the lock before executing a procedure. This is the only 1482 // place where we may hold two procedure execution locks, and there is a fence in the 1483 // RootProcedureState where we can make sure that only one worker can execute the rollback of 1484 // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to 1485 // prevent race between us and the force update thread. 1486 if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) { 1487 try { 1488 lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1489 } catch (IOException e) { 1490 // can only happen if interrupted, so not a big deal to propagate it 1491 throw new UncheckedIOException(e); 1492 } 1493 } 1494 try { 1495 // For the sub procedures which are successfully finished, we do not rollback them. 1496 // Typically, if we want to rollback a procedure, we first need to rollback it, and then 1497 // recursively rollback its ancestors. The state changes which are done by sub procedures 1498 // should be handled by parent procedures when rolling back. For example, when rolling back 1499 // a MergeTableProcedure, we will schedule new procedures to bring the offline regions 1500 // online, instead of rolling back the original procedures which offlined the regions(in 1501 // fact these procedures can not be rolled back...). 1502 if (proc.isSuccess()) { 1503 // Just do the cleanup work, without actually executing the rollback 1504 subprocStack.remove(stackTail); 1505 cleanupAfterRollbackOneStep(proc); 1506 continue; 1507 } 1508 LockState lockState = acquireLock(proc); 1509 if (lockState != LockState.LOCK_ACQUIRED) { 1510 // can't take a lock on the procedure, add the root-proc back on the 1511 // queue waiting for the lock availability 1512 return lockState; 1513 } 1514 1515 lockState = executeRollback(proc); 1516 releaseLock(proc, false); 1517 boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; 1518 abortRollback |= !isRunning() || !store.isRunning(); 1519 1520 // allows to kill the executor before something is stored to the wal. 1521 // useful to test the procedure recovery. 1522 if (abortRollback) { 1523 return lockState; 1524 } 1525 1526 subprocStack.remove(stackTail); 1527 1528 // if the procedure is kind enough to pass the slot to someone else, yield 1529 // if the proc is already finished, do not yield 1530 if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { 1531 return LockState.LOCK_YIELD_WAIT; 1532 } 1533 1534 if (proc != rootProc) { 1535 execCompletionCleanup(proc); 1536 } 1537 } finally { 1538 if (lockEntry != null) { 1539 procExecutionLock.releaseLockEntry(lockEntry); 1540 } 1541 } 1542 } 1543 1544 // Finalize the procedure state 1545 LOG.info("Rolled back {} exec-time={}", rootProc, 1546 StringUtils.humanTimeDiff(rootProc.elapsedTime())); 1547 procedureFinished(rootProc); 1548 return LockState.LOCK_ACQUIRED; 1549 } 1550 1551 private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { 1552 if (proc.removeStackIndex()) { 1553 if (!proc.isSuccess()) { 1554 proc.setState(ProcedureState.ROLLEDBACK); 1555 } 1556 1557 // update metrics on finishing the procedure (fail) 1558 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); 1559 1560 if (proc.hasParent()) { 1561 store.delete(proc.getProcId()); 1562 procedures.remove(proc.getProcId()); 1563 } else { 1564 final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); 1565 if (childProcIds != null) { 1566 store.delete(proc, childProcIds); 1567 } else { 1568 store.update(proc); 1569 } 1570 } 1571 } else { 1572 store.update(proc); 1573 } 1574 } 1575 1576 /** 1577 * Execute the rollback of the procedure step. 1578 * It updates the store with the new state (stack index) 1579 * or will remove completly the procedure in case it is a child. 1580 */ 1581 private LockState executeRollback(Procedure<TEnvironment> proc) { 1582 try { 1583 proc.doRollback(getEnvironment()); 1584 } catch (IOException e) { 1585 LOG.debug("Roll back attempt failed for {}", proc, e); 1586 return LockState.LOCK_YIELD_WAIT; 1587 } catch (InterruptedException e) { 1588 handleInterruptedException(proc, e); 1589 return LockState.LOCK_YIELD_WAIT; 1590 } catch (Throwable e) { 1591 // Catch NullPointerExceptions or similar errors... 1592 LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); 1593 } 1594 1595 // allows to kill the executor before something is stored to the wal. 1596 // useful to test the procedure recovery. 1597 if (testing != null && testing.shouldKillBeforeStoreUpdate()) { 1598 String msg = "TESTING: Kill before store update"; 1599 LOG.debug(msg); 1600 stop(); 1601 throw new RuntimeException(msg); 1602 } 1603 1604 cleanupAfterRollbackOneStep(proc); 1605 1606 return LockState.LOCK_ACQUIRED; 1607 } 1608 1609 private void yieldProcedure(Procedure<TEnvironment> proc) { 1610 releaseLock(proc, false); 1611 scheduler.yield(proc); 1612 } 1613 1614 /** 1615 * Executes <code>procedure</code> 1616 * <ul> 1617 * <li>Calls the doExecute() of the procedure 1618 * <li>If the procedure execution didn't fail (i.e. valid user input) 1619 * <ul> 1620 * <li>...and returned subprocedures 1621 * <ul><li>The subprocedures are initialized. 1622 * <li>The subprocedures are added to the store 1623 * <li>The subprocedures are added to the runnable queue 1624 * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete 1625 * </ul> 1626 * </li> 1627 * <li>...if there are no subprocedure 1628 * <ul><li>the procedure completed successfully 1629 * <li>if there is a parent (WAITING) 1630 * <li>the parent state will be set to RUNNABLE 1631 * </ul> 1632 * </li> 1633 * </ul> 1634 * </li> 1635 * <li>In case of failure 1636 * <ul> 1637 * <li>The store is updated with the new state</li> 1638 * <li>The executor (caller of this method) will start the rollback of the procedure</li> 1639 * </ul> 1640 * </li> 1641 * </ul> 1642 */ 1643 private void execProcedure(RootProcedureState<TEnvironment> procStack, 1644 Procedure<TEnvironment> procedure) { 1645 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 1646 "NOT RUNNABLE! " + procedure.toString()); 1647 1648 // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. 1649 // The exception is caught below and then we hurry to the exit without disturbing state. The 1650 // idea is that the processing of this procedure will be unsuspended later by an external event 1651 // such the report of a region open. 1652 boolean suspended = false; 1653 1654 // Whether to 're-' -execute; run through the loop again. 1655 boolean reExecute = false; 1656 1657 Procedure<TEnvironment>[] subprocs = null; 1658 do { 1659 reExecute = false; 1660 procedure.resetPersistence(); 1661 try { 1662 subprocs = procedure.doExecute(getEnvironment()); 1663 if (subprocs != null && subprocs.length == 0) { 1664 subprocs = null; 1665 } 1666 } catch (ProcedureSuspendedException e) { 1667 LOG.trace("Suspend {}", procedure); 1668 suspended = true; 1669 } catch (ProcedureYieldException e) { 1670 LOG.trace("Yield {}", procedure, e); 1671 yieldProcedure(procedure); 1672 return; 1673 } catch (InterruptedException e) { 1674 LOG.trace("Yield interrupt {}", procedure, e); 1675 handleInterruptedException(procedure, e); 1676 yieldProcedure(procedure); 1677 return; 1678 } catch (Throwable e) { 1679 // Catch NullPointerExceptions or similar errors... 1680 String msg = "CODE-BUG: Uncaught runtime exception: " + procedure; 1681 LOG.error(msg, e); 1682 procedure.setFailure(new RemoteProcedureException(msg, e)); 1683 } 1684 1685 if (!procedure.isFailed()) { 1686 if (subprocs != null) { 1687 if (subprocs.length == 1 && subprocs[0] == procedure) { 1688 // Procedure returned itself. Quick-shortcut for a state machine-like procedure; 1689 // i.e. we go around this loop again rather than go back out on the scheduler queue. 1690 subprocs = null; 1691 reExecute = true; 1692 LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); 1693 } else { 1694 // Yield the current procedure, and make the subprocedure runnable 1695 // subprocs may come back 'null'. 1696 subprocs = initializeChildren(procStack, procedure, subprocs); 1697 LOG.info("Initialized subprocedures=" + 1698 (subprocs == null? null: 1699 Stream.of(subprocs).map(e -> "{" + e.toString() + "}"). 1700 collect(Collectors.toList()).toString())); 1701 } 1702 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1703 LOG.trace("Added to timeoutExecutor {}", procedure); 1704 timeoutExecutor.add(procedure); 1705 } else if (!suspended) { 1706 // No subtask, so we are done 1707 procedure.setState(ProcedureState.SUCCESS); 1708 } 1709 } 1710 1711 // Add the procedure to the stack 1712 procStack.addRollbackStep(procedure); 1713 1714 // allows to kill the executor before something is stored to the wal. 1715 // useful to test the procedure recovery. 1716 if (testing != null && 1717 testing.shouldKillBeforeStoreUpdate(suspended, procedure.hasParent())) { 1718 kill("TESTING: Kill BEFORE store update: " + procedure); 1719 } 1720 1721 // TODO: The code here doesn't check if store is running before persisting to the store as 1722 // it relies on the method call below to throw RuntimeException to wind up the stack and 1723 // executor thread to stop. The statement following the method call below seems to check if 1724 // store is not running, to prevent scheduling children procedures, re-execution or yield 1725 // of this procedure. This may need more scrutiny and subsequent cleanup in future 1726 // 1727 // Commit the transaction even if a suspend (state may have changed). Note this append 1728 // can take a bunch of time to complete. 1729 if (procedure.needPersistence()) { 1730 updateStoreOnExec(procStack, procedure, subprocs); 1731 } 1732 1733 // if the store is not running we are aborting 1734 if (!store.isRunning()) { 1735 return; 1736 } 1737 // if the procedure is kind enough to pass the slot to someone else, yield 1738 if (procedure.isRunnable() && !suspended && 1739 procedure.isYieldAfterExecutionStep(getEnvironment())) { 1740 yieldProcedure(procedure); 1741 return; 1742 } 1743 1744 assert (reExecute && subprocs == null) || !reExecute; 1745 } while (reExecute); 1746 1747 // Allows to kill the executor after something is stored to the WAL but before the below 1748 // state settings are done -- in particular the one on the end where we make parent 1749 // RUNNABLE again when its children are done; see countDownChildren. 1750 if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) { 1751 kill("TESTING: Kill AFTER store update: " + procedure); 1752 } 1753 1754 // Submit the new subprocedures 1755 if (subprocs != null && !procedure.isFailed()) { 1756 submitChildrenProcedures(subprocs); 1757 } 1758 1759 // we need to log the release lock operation before waking up the parent procedure, as there 1760 // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all 1761 // the sub procedures from store and cause problems... 1762 releaseLock(procedure, false); 1763 1764 // if the procedure is complete and has a parent, count down the children latch. 1765 // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. 1766 if (!suspended && procedure.isFinished() && procedure.hasParent()) { 1767 countDownChildren(procStack, procedure); 1768 } 1769 } 1770 1771 private void kill(String msg) { 1772 LOG.debug(msg); 1773 stop(); 1774 throw new RuntimeException(msg); 1775 } 1776 1777 private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, 1778 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1779 assert subprocs != null : "expected subprocedures"; 1780 final long rootProcId = getRootProcedureId(procedure); 1781 for (int i = 0; i < subprocs.length; ++i) { 1782 Procedure<TEnvironment> subproc = subprocs[i]; 1783 if (subproc == null) { 1784 String msg = "subproc[" + i + "] is null, aborting the procedure"; 1785 procedure.setFailure(new RemoteProcedureException(msg, 1786 new IllegalArgumentIOException(msg))); 1787 return null; 1788 } 1789 1790 assert subproc.getState() == ProcedureState.INITIALIZING : subproc; 1791 subproc.setParentProcId(procedure.getProcId()); 1792 subproc.setRootProcId(rootProcId); 1793 subproc.setProcId(nextProcId()); 1794 procStack.addSubProcedure(subproc); 1795 } 1796 1797 if (!procedure.isFailed()) { 1798 procedure.setChildrenLatch(subprocs.length); 1799 switch (procedure.getState()) { 1800 case RUNNABLE: 1801 procedure.setState(ProcedureState.WAITING); 1802 break; 1803 case WAITING_TIMEOUT: 1804 timeoutExecutor.add(procedure); 1805 break; 1806 default: 1807 break; 1808 } 1809 } 1810 return subprocs; 1811 } 1812 1813 private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { 1814 for (int i = 0; i < subprocs.length; ++i) { 1815 Procedure<TEnvironment> subproc = subprocs[i]; 1816 subproc.updateMetricsOnSubmit(getEnvironment()); 1817 assert !procedures.containsKey(subproc.getProcId()); 1818 procedures.put(subproc.getProcId(), subproc); 1819 scheduler.addFront(subproc); 1820 } 1821 } 1822 1823 private void countDownChildren(RootProcedureState<TEnvironment> procStack, 1824 Procedure<TEnvironment> procedure) { 1825 Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); 1826 if (parent == null) { 1827 assert procStack.isRollingback(); 1828 return; 1829 } 1830 1831 // If this procedure is the last child awake the parent procedure 1832 if (parent.tryRunnable()) { 1833 // If we succeeded in making the parent runnable -- i.e. all of its 1834 // children have completed, move parent to front of the queue. 1835 store.update(parent); 1836 scheduler.addFront(parent); 1837 LOG.info("Finished subprocedure pid={}, resume processing parent {}", 1838 procedure.getProcId(), parent); 1839 return; 1840 } 1841 } 1842 1843 private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, 1844 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1845 if (subprocs != null && !procedure.isFailed()) { 1846 if (LOG.isTraceEnabled()) { 1847 LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); 1848 } 1849 store.insert(procedure, subprocs); 1850 } else { 1851 LOG.trace("Store update {}", procedure); 1852 if (procedure.isFinished() && !procedure.hasParent()) { 1853 // remove child procedures 1854 final long[] childProcIds = procStack.getSubprocedureIds(); 1855 if (childProcIds != null) { 1856 store.delete(procedure, childProcIds); 1857 for (int i = 0; i < childProcIds.length; ++i) { 1858 procedures.remove(childProcIds[i]); 1859 } 1860 } else { 1861 store.update(procedure); 1862 } 1863 } else { 1864 store.update(procedure); 1865 } 1866 } 1867 } 1868 1869 private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { 1870 LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); 1871 // NOTE: We don't call Thread.currentThread().interrupt() 1872 // because otherwise all the subsequent calls e.g. Thread.sleep() will throw 1873 // the InterruptedException. If the master is going down, we will be notified 1874 // and the executor/store will be stopped. 1875 // (The interrupted procedure will be retried on the next run) 1876 } 1877 1878 private void execCompletionCleanup(Procedure<TEnvironment> proc) { 1879 final TEnvironment env = getEnvironment(); 1880 if (proc.hasLock()) { 1881 LOG.warn("Usually this should not happen, we will release the lock before if the procedure" + 1882 " is finished, even if the holdLock is true, arrive here means we have some holes where" + 1883 " we do not release the lock. And the releaseLock below may fail since the procedure may" + 1884 " have already been deleted from the procedure store."); 1885 releaseLock(proc, true); 1886 } 1887 try { 1888 proc.completionCleanup(env); 1889 } catch (Throwable e) { 1890 // Catch NullPointerExceptions or similar errors... 1891 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); 1892 } 1893 } 1894 1895 private void procedureFinished(Procedure<TEnvironment> proc) { 1896 // call the procedure completion cleanup handler 1897 execCompletionCleanup(proc); 1898 1899 CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); 1900 1901 // update the executor internal state maps 1902 if (!proc.shouldWaitClientAck(getEnvironment())) { 1903 retainer.setClientAckTime(0); 1904 } 1905 1906 completed.put(proc.getProcId(), retainer); 1907 rollbackStack.remove(proc.getProcId()); 1908 procedures.remove(proc.getProcId()); 1909 1910 // call the runnableSet completion cleanup handler 1911 try { 1912 scheduler.completionCleanup(proc); 1913 } catch (Throwable e) { 1914 // Catch NullPointerExceptions or similar errors... 1915 LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); 1916 } 1917 1918 // Notify the listeners 1919 sendProcedureFinishedNotification(proc.getProcId()); 1920 } 1921 1922 RootProcedureState<TEnvironment> getProcStack(long rootProcId) { 1923 return rollbackStack.get(rootProcId); 1924 } 1925 1926 @VisibleForTesting 1927 ProcedureScheduler getProcedureScheduler() { 1928 return scheduler; 1929 } 1930 1931 @VisibleForTesting 1932 int getCompletedSize() { 1933 return completed.size(); 1934 } 1935 1936 @VisibleForTesting 1937 public IdLock getProcExecutionLock() { 1938 return procExecutionLock; 1939 } 1940 1941 // ========================================================================== 1942 // Worker Thread 1943 // ========================================================================== 1944 private class WorkerThread extends StoppableThread { 1945 private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); 1946 private volatile Procedure<TEnvironment> activeProcedure; 1947 1948 public WorkerThread(ThreadGroup group) { 1949 this(group, "PEWorker-"); 1950 } 1951 1952 protected WorkerThread(ThreadGroup group, String prefix) { 1953 super(group, prefix + workerId.incrementAndGet()); 1954 setDaemon(true); 1955 } 1956 1957 @Override 1958 public void sendStopSignal() { 1959 scheduler.signalAll(); 1960 } 1961 @Override 1962 public void run() { 1963 long lastUpdate = EnvironmentEdgeManager.currentTime(); 1964 try { 1965 while (isRunning() && keepAlive(lastUpdate)) { 1966 @SuppressWarnings("unchecked") 1967 Procedure<TEnvironment> proc = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); 1968 if (proc == null) { 1969 continue; 1970 } 1971 this.activeProcedure = proc; 1972 int activeCount = activeExecutorCount.incrementAndGet(); 1973 int runningCount = store.setRunningProcedureCount(activeCount); 1974 LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), 1975 runningCount, activeCount); 1976 executionStartTime.set(EnvironmentEdgeManager.currentTime()); 1977 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1978 try { 1979 executeProcedure(proc); 1980 } catch (AssertionError e) { 1981 LOG.info("ASSERT pid=" + proc.getProcId(), e); 1982 throw e; 1983 } finally { 1984 procExecutionLock.releaseLockEntry(lockEntry); 1985 activeCount = activeExecutorCount.decrementAndGet(); 1986 runningCount = store.setRunningProcedureCount(activeCount); 1987 LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), 1988 runningCount, activeCount); 1989 this.activeProcedure = null; 1990 lastUpdate = EnvironmentEdgeManager.currentTime(); 1991 executionStartTime.set(Long.MAX_VALUE); 1992 } 1993 } 1994 } catch (Throwable t) { 1995 LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); 1996 } finally { 1997 LOG.trace("Worker terminated."); 1998 } 1999 workerThreads.remove(this); 2000 } 2001 2002 @Override 2003 public String toString() { 2004 Procedure<?> p = this.activeProcedure; 2005 return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); 2006 } 2007 2008 /** 2009 * @return the time since the current procedure is running 2010 */ 2011 public long getCurrentRunTime() { 2012 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); 2013 } 2014 2015 // core worker never timeout 2016 protected boolean keepAlive(long lastUpdate) { 2017 return true; 2018 } 2019 } 2020 2021 // A worker thread which can be added when core workers are stuck. Will timeout after 2022 // keepAliveTime if there is no procedure to run. 2023 private final class KeepAliveWorkerThread extends WorkerThread { 2024 public KeepAliveWorkerThread(ThreadGroup group) { 2025 super(group, "KeepAlivePEWorker-"); 2026 } 2027 2028 @Override 2029 protected boolean keepAlive(long lastUpdate) { 2030 return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; 2031 } 2032 } 2033 2034 // ---------------------------------------------------------------------------- 2035 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the 2036 // full set of procedures pending and completed to write a compacted 2037 // version of the log (in case is a log)? 2038 // In theory no, procedures are have a short life, so at some point the store 2039 // will have the tracker saying everything is in the last log. 2040 // ---------------------------------------------------------------------------- 2041 2042 private final class WorkerMonitor extends InlineChore { 2043 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = 2044 "hbase.procedure.worker.monitor.interval.msec"; 2045 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec 2046 2047 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = 2048 "hbase.procedure.worker.stuck.threshold.msec"; 2049 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec 2050 2051 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = 2052 "hbase.procedure.worker.add.stuck.percentage"; 2053 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck 2054 2055 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; 2056 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; 2057 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; 2058 2059 public WorkerMonitor() { 2060 refreshConfig(); 2061 } 2062 2063 @Override 2064 public void run() { 2065 final int stuckCount = checkForStuckWorkers(); 2066 checkThreadCount(stuckCount); 2067 2068 // refresh interval (poor man dynamic conf update) 2069 refreshConfig(); 2070 } 2071 2072 private int checkForStuckWorkers() { 2073 // check if any of the worker is stuck 2074 int stuckCount = 0; 2075 for (WorkerThread worker : workerThreads) { 2076 if (worker.getCurrentRunTime() < stuckThreshold) { 2077 continue; 2078 } 2079 2080 // WARN the worker is stuck 2081 stuckCount++; 2082 LOG.warn("Worker stuck {}, run time {}", worker, 2083 StringUtils.humanTimeDiff(worker.getCurrentRunTime())); 2084 } 2085 return stuckCount; 2086 } 2087 2088 private void checkThreadCount(final int stuckCount) { 2089 // nothing to do if there are no runnable tasks 2090 if (stuckCount < 1 || !scheduler.hasRunnables()) { 2091 return; 2092 } 2093 2094 // add a new thread if the worker stuck percentage exceed the threshold limit 2095 // and every handler is active. 2096 final float stuckPerc = ((float) stuckCount) / workerThreads.size(); 2097 // let's add new worker thread more aggressively, as they will timeout finally if there is no 2098 // work to do. 2099 if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { 2100 final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); 2101 workerThreads.add(worker); 2102 worker.start(); 2103 LOG.debug("Added new worker thread {}", worker); 2104 } 2105 } 2106 2107 private void refreshConfig() { 2108 addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, 2109 DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); 2110 timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, 2111 DEFAULT_WORKER_MONITOR_INTERVAL); 2112 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, 2113 DEFAULT_WORKER_STUCK_THRESHOLD); 2114 } 2115 2116 @Override 2117 public int getTimeoutInterval() { 2118 return timeoutInterval; 2119 } 2120 } 2121}